diff options
author | Paul Jones <paulj@lshift.net> | 2009-08-28 15:40:18 +0100 |
---|---|---|
committer | Paul Jones <paulj@lshift.net> | 2009-08-28 15:40:18 +0100 |
commit | e664ae5208de72637485e2547caa761e245f53a3 (patch) | |
tree | 502f58faaf5d681fb8287bd659791b0e174f4e45 | |
parent | beaaf30dc12ae60d1a7acf9c9a47f3367b260549 (diff) | |
parent | e3a94b0accf234dcaae542ff13054e8f0d8b95bb (diff) | |
download | rabbitmq-server-e664ae5208de72637485e2547caa761e245f53a3.tar.gz |
Merged default into bug17880bug17880
52 files changed, 1510 insertions, 729 deletions
@@ -1,6 +1,8 @@ syntax: glob *.beam *~ +*.swp +*.patch erl_crash.dump syntax: regexp @@ -20,10 +20,10 @@ PYTHON=python ifndef USE_SPECS # our type specs rely on features / bug fixes in dialyzer that are -# only available in R12B-3 upwards +# only available in R13B upwards (R13B is eshell 5.7.1) # # NB: the test assumes that version number will only contain single digits -USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.6.2" ]; then echo "true"; else echo "false"; fi) +USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.0" ]; then echo "true"; else echo "false"; fi) endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests @@ -39,9 +39,6 @@ AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e -# for the moment we don't use boot files because they introduce a -# dependency on particular versions of OTP applications -#all: $(EBIN_DIR)/rabbit.boot all: $(TARGETS) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app @@ -101,7 +98,8 @@ run-tests: all start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ - ./scripts/rabbitmq-server -detached; sleep 1 + RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ + ./scripts/rabbitmq-server ; sleep 1 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) @@ -115,8 +113,11 @@ force-snapshot: all stop-node: -$(ERL_CALL) -q +# code coverage will be created for subdirectory "ebin" of COVER_DIR +COVER_DIR=. + start-cover: all - echo "cover:start(), rabbit_misc:enable_cover()." | $(ERL_CALL) + echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) stop-cover: all echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL) @@ -136,7 +137,7 @@ srcdist: distclean sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ - cp codegen.py Makefile generate_app $(TARGET_SRC_DIR) + cp codegen.py Makefile generate_app calculate-relative $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) cp -r docs $(TARGET_SRC_DIR) @@ -147,7 +148,7 @@ srcdist: distclean rm -rf $(TARGET_SRC_DIR) distclean: clean - make -C $(AMQP_CODEGEN_DIR) distclean + $(MAKE) -C $(AMQP_CODEGEN_DIR) distclean rm -rf dist find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \; @@ -162,7 +163,8 @@ distclean: clean docs_all: $(MANPAGES) -install: all docs_all +install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) +install: all docs_all install_dirs @[ -n "$(TARGET_DIR)" ] || (echo "Please set TARGET_DIR."; false) @[ -n "$(SBIN_DIR)" ] || (echo "Please set SBIN_DIR."; false) @[ -n "$(MAN_DIR)" ] || (echo "Please set MAN_DIR."; false) @@ -171,13 +173,17 @@ install: all docs_all cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR) chmod 0755 scripts/* - mkdir -p $(SBIN_DIR) - cp scripts/rabbitmq-server $(SBIN_DIR) - cp scripts/rabbitmqctl $(SBIN_DIR) - cp scripts/rabbitmq-multi $(SBIN_DIR) + for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins; do \ + cp scripts/$$script $(TARGET_DIR)/sbin; \ + [ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \ + done for section in 1 5; do \ mkdir -p $(MAN_DIR)/man$$section; \ for manpage in docs/*.$$section.pod; do \ cp docs/`basename $$manpage .pod`.gz $(MAN_DIR)/man$$section; \ done; \ done + +install_dirs: + mkdir -p $(SBIN_DIR) + mkdir -p $(TARGET_DIR)/sbin diff --git a/calculate-relative b/calculate-relative new file mode 100755 index 00000000..3af18e8f --- /dev/null +++ b/calculate-relative @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# +# relpath.py +# R.Barran 30/08/2004 +# Retrieved from http://code.activestate.com/recipes/302594/ + +import os +import sys + +def relpath(target, base=os.curdir): + """ + Return a relative path to the target from either the current dir or an optional base dir. + Base can be a directory specified either as absolute or relative to current dir. + """ + + if not os.path.exists(target): + raise OSError, 'Target does not exist: '+target + + if not os.path.isdir(base): + raise OSError, 'Base is not a directory or does not exist: '+base + + base_list = (os.path.abspath(base)).split(os.sep) + target_list = (os.path.abspath(target)).split(os.sep) + + # On the windows platform the target may be on a completely different drive from the base. + if os.name in ['nt','dos','os2'] and base_list[0] <> target_list[0]: + raise OSError, 'Target is on a different drive to base. Target: '+target_list[0].upper()+', base: '+base_list[0].upper() + + # Starting from the filepath root, work out how much of the filepath is + # shared by base and target. + for i in range(min(len(base_list), len(target_list))): + if base_list[i] <> target_list[i]: break + else: + # If we broke out of the loop, i is pointing to the first differing path elements. + # If we didn't break out of the loop, i is pointing to identical path elements. + # Increment i so that in all cases it points to the first differing path elements. + i+=1 + + rel_list = [os.pardir] * (len(base_list)-i) + target_list[i:] + if (len(rel_list) == 0): + return "." + return os.path.join(*rel_list) + +if __name__ == "__main__": + print(relpath(sys.argv[1], sys.argv[2])) @@ -117,6 +117,13 @@ def genErl(spec): def genMethodHasContent(m): print "method_has_content(%s) -> %s;" % (m.erlangName(), str(m.hasContent).lower()) + + def genMethodIsSynchronous(m): + hasNoWait = "nowait" in fieldNameList(m.arguments) + if m.isSynchronous and hasNoWait: + print "is_method_synchronous(#%s{nowait = NoWait}) -> not(NoWait);" % (m.erlangName()) + else: + print "is_method_synchronous(#%s{}) -> %s;" % (m.erlangName(), str(m.isSynchronous).lower()) def genMethodFieldTypes(m): """Not currently used - may be useful in future?""" @@ -246,6 +253,7 @@ def genErl(spec): -export([method_id/1]). -export([method_has_content/1]). +-export([is_method_synchronous/1]). -export([method_fieldnames/1]). -export([decode_method_fields/2]). -export([decode_properties/2]). @@ -266,6 +274,9 @@ bitvalue(undefined) -> 0. for m in methods: genMethodHasContent(m) print "method_has_content(Name) -> exit({unknown_method_name, Name})." + for m in methods: genMethodIsSynchronous(m) + print "is_method_synchronous(Name) -> exit({unknown_method_name, Name})." + for m in methods: genMethodFieldNames(m) print "method_fieldnames(Name) -> exit({unknown_method_name, Name})." diff --git a/docs/rabbitmq-activate-plugins.1.pod b/docs/rabbitmq-activate-plugins.1.pod new file mode 100644 index 00000000..58ffea79 --- /dev/null +++ b/docs/rabbitmq-activate-plugins.1.pod @@ -0,0 +1,37 @@ +=head1 NAME + +rabbitmq-activate-plugins - command line tool for activating plugins +in a RabbitMQ broker + +=head1 SYNOPSIS + +rabbitmq-activate-plugins + +=head1 DESCRIPTION + +RabbitMQ is an implementation of AMQP, the emerging standard for high +performance enterprise messaging. The RabbitMQ server is a robust and +scalable implementation of an AMQP broker. + +rabbitmq-activate-plugins is a command line tool for activating +plugins installed into the broker's plugins directory. + +=head1 EXAMPLES + +To activate all of the installed plugins in the current RabbitMQ install, +execute: + + rabbitmq-activate-plugins + +=head1 SEE ALSO + +L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>, +L<rabbitmqctl(1)> + +=head1 AUTHOR + +The RabbitMQ Team <info@rabbitmq.com> + +=head1 REFERENCES + +RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmq-multi.1.pod b/docs/rabbitmq-multi.1.pod index 23fd96ed..640609ee 100644 --- a/docs/rabbitmq-multi.1.pod +++ b/docs/rabbitmq-multi.1.pod @@ -15,22 +15,30 @@ scalable implementation of an AMQP broker. rabbitmq-multi scripts allows for easy set-up of a cluster on a single machine. -See also rabbitmq-server(1) for configuration information. +See also L<rabbitmq-server(1)> for configuration information. =head1 COMMANDS -start_all I<count> - start count nodes with unique names, listening on all IP addresses - and on sequential ports starting from 5672. +=over -status - print the status of all running RabbitMQ nodes +=item start_all I<count> -stop_all - stop all local RabbitMQ nodes +Start count nodes with unique names, listening on all IP addresses and +on sequential ports starting from 5672. -rotate_logs - rotate log files for all local and running RabbitMQ nodes +=item status + +Print the status of all running RabbitMQ nodes. + +=item stop_all + +Stop all local RabbitMQ nodes, + +=item rotate_logs + +Rotate log files for all local and running RabbitMQ nodes. + +=back =head1 EXAMPLES @@ -40,7 +48,7 @@ Start 3 local RabbitMQ nodes with unique, sequential port numbers: =head1 SEE ALSO -rabbitmq.conf(5), rabbitmq-server(1), rabbitmqctl(1) +L<rabbitmq.conf(5)>, L<rabbitmq-server(1)>, L<rabbitmqctl(1)> =head1 AUTHOR @@ -48,4 +56,4 @@ The RabbitMQ Team <info@rabbitmq.com> =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmq-server.1.pod b/docs/rabbitmq-server.1.pod index 99a7cecc..d74ab8d9 100644 --- a/docs/rabbitmq-server.1.pod +++ b/docs/rabbitmq-server.1.pod @@ -16,43 +16,57 @@ Running rabbitmq-server in the foreground displays a banner message, and reports on progress in the startup sequence, concluding with the message "broker running", indicating that the RabbitMQ broker has been started successfully. To shut down the server, just terminate the -process or use rabbitmqctl(1). +process or use L<rabbitmqctl(1)>. =head1 ENVIRONMENT -B<RABBITMQ_MNESIA_BASE> - Defaults to /var/lib/rabbitmq/mnesia. Set this to the directory - where Mnesia database files should be placed. +=over -B<RABBITMQ_LOG_BASE> - Defaults to /var/log/rabbitmq. Log files generated by the server - will be placed in this directory. +=item B<RABBITMQ_MNESIA_BASE> -B<RABBITMQ_NODENAME> - Defaults to rabbit. This can be useful if you want to run more - than one node per machine - B<RABBITMQ_NODENAME> should be unique - per erlang-node-and-machine combination. See clustering on a - single machine guide at - http://www.rabbitmq.com/clustering.html#single-machine for - details. +Defaults to F</var/lib/rabbitmq/mnesia>. Set this to the directory where +Mnesia database files should be placed. -B<RABBITMQ_NODE_IP_ADDRESS> - Defaults to 0.0.0.0. This can be changed if you only want to bind - to one network interface. +=item B<RABBITMQ_LOG_BASE> -B<RABBITMQ_NODE_PORT> - Defaults to 5672. +Defaults to F</var/log/rabbitmq>. Log files generated by the server will +be placed in this directory. -B<RABBITMQ_CLUSTER_CONFIG_FILE> - Defaults to /etc/rabbitmq/rabbitmq_cluster.config. If this file is - present it is used by the server to auto-configure a RabbitMQ - cluster. - See the clustering guide at http://www.rabbitmq.com/clustering.html - for details. +=item B<RABBITMQ_NODENAME> + +Defaults to rabbit. This can be useful if you want to run more than +one node per machine - B<RABBITMQ_NODENAME> should be unique per +erlang-node-and-machine combination. See clustering on a single +machine guide at +L<http://www.rabbitmq.com/clustering.html#single-machine> for details. + +=item B<RABBITMQ_NODE_IP_ADDRESS> + +Defaults to 0.0.0.0. This can be changed if you only want to bind to +one network interface. + +=item B<RABBITMQ_NODE_PORT> + +Defaults to 5672. + +=item B<RABBITMQ_CLUSTER_CONFIG_FILE> + +Defaults to F</etc/rabbitmq/rabbitmq_cluster.config>. If this file is +present it is used by the server to auto-configure a RabbitMQ cluster. +See the clustering guide at L<http://www.rabbitmq.com/clustering.html> +for details. + +=back =head1 OPTIONS -B<-detached> start the server process in the background +=over + +=item B<-detached> + +start the server process in the background + +=back =head1 EXAMPLES @@ -62,7 +76,7 @@ Run RabbitMQ AMQP server in the background: =head1 SEE ALSO -rabbitmq.conf(5), rabbitmq-multi(1), rabbitmqctl(1) +L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmqctl(1)> =head1 AUTHOR @@ -70,4 +84,5 @@ The RabbitMQ Team <info@rabbitmq.com> =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L<http://www.rabbitmq.com> + diff --git a/docs/rabbitmq.conf.5.pod b/docs/rabbitmq.conf.5.pod index 9b2536c3..a7bf4c09 100644 --- a/docs/rabbitmq.conf.5.pod +++ b/docs/rabbitmq.conf.5.pod @@ -1,10 +1,11 @@ =head1 NAME -/etc/rabbitmq/rabbitmq.conf - default settings for RabbitMQ AMQP server +F</etc/rabbitmq/rabbitmq.conf> - default settings for RabbitMQ AMQP +server =head1 DESCRIPTION -/etc/rabbitmq/rabbitmq.conf contains variable settings that override the +F</etc/rabbitmq/rabbitmq.conf> contains variable settings that override the defaults built in to the RabbitMQ startup scripts. The file is interpreted by the system shell, and so should consist of @@ -13,27 +14,35 @@ syntax is permitted (since the file is sourced using the shell "." operator), including line comments starting with "#". In order of preference, the startup scripts get their values from the -environment, from /etc/rabbitmq/rabbitmq.conf and finally from the -built-in default values. For example, for the B<RABBITMQ_NODENAME> setting, +environment, from F</etc/rabbitmq/rabbitmq.conf> and finally from the +built-in default values. For example, for the B<RABBITMQ_NODENAME> +setting, -B<RABBITMQ_NODENAME> - from the environment is checked first. If it is absent or equal to - the empty string, then +=over -B<NODENAME> - from /etc/rabbitmq/rabbitmq.conf is checked. If it is also absent - or set equal to the empty string then the default value from - the startup script is used. +=item B<RABBITMQ_NODENAME> + +from the environment is checked first. If it is absent or equal to the +empty string, then + +=item B<NODENAME> + +from L</etc/rabbitmq/rabbitmq.conf> is checked. If it is also absent +or set equal to the empty string then the default value from the +startup script is used. The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the environment variable names, with the B<RABBITMQ_> prefix removed: B<RABBITMQ_NODE_PORT> from the environment becomes B<NODE_PORT> in the -/etc/rabbitmq/rabbitmq.conf file, etc. +F</etc/rabbitmq/rabbitmq.conf> file, etc. + +=back =head1 EXAMPLES -The following is an example of a complete /etc/rabbitmq/rabbitmq.conf file -that overrides the default Erlang node name from "rabbit" to "hare": +The following is an example of a complete +F</etc/rabbitmq/rabbitmq.conf> file that overrides the default Erlang +node name from "rabbit" to "hare": # I am a complete /etc/rabbitmq/rabbitmq.conf file. # Comment lines start with a hash character. @@ -42,7 +51,7 @@ that overrides the default Erlang node name from "rabbit" to "hare": =head1 SEE ALSO -rabbitmq-server(1), rabbitmq-multi(1), rabbitmqctl(1) +L<rabbitmq-server(1)>, L<rabbitmq-multi(1)>, L<rabbitmqctl(1)> =head1 AUTHOR @@ -57,4 +66,4 @@ info@rabbitmq.com. =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 42156896..6d4aadeb 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -18,269 +18,388 @@ It performs all actions by connecting to one of the broker's nodes. =head1 OPTIONS -B<-n> I<node> - default node is C<rabbit@server>, where server is the local host. - On a host named C<server.example.com>, the node name of the - RabbitMQ Erlang node will usually be rabbit@server (unless - RABBITMQ_NODENAME has been set to some non-default value at broker - startup time). The output of hostname -s is usually the correct - suffix to use after the "@" sign. See rabbitmq-server(1) for - details of configuring the RabbitMQ broker. - -B<-q> - quiet output mode is selected with the B<-q> flag. Informational - messages are suppressed when quiet mode is in effect. +=over + +=item B<-n> I<node> + +Default node is C<rabbit@server>, where server is the local host. On +a host named C<server.example.com>, the node name of the RabbitMQ +Erlang node will usually be rabbit@server (unless RABBITMQ_NODENAME +has been set to some non-default value at broker startup time). The +output of hostname -s is usually the correct suffix to use after the +"@" sign. See rabbitmq-server(1) for details of configuring the +RabbitMQ broker. + +=item B<-q> + +Quiet output mode is selected with the B<-q> flag. Informational +messages are suppressed when quiet mode is in effect. + +=back =head1 COMMANDS =head2 APPLICATION AND CLUSTER MANAGEMENT -stop - stop the Erlang node on which RabbitMQ broker is running. - -stop_app - stop the RabbitMQ application, leaving the Erlang node running. - This command is typically run prior to performing other management - actions that require the RabbitMQ application to be stopped, - e.g. I<reset>. - -start_app - start the RabbitMQ application. - This command is typically run prior to performing other management - actions that require the RabbitMQ application to be stopped, - e.g. I<reset>. - -status - display various information about the RabbitMQ broker, such as - whether the RabbitMQ application on the current node, its version - number, what nodes are part of the broker, which of these are - running. - -force - return a RabbitMQ node to its virgin state. - Removes the node from any cluster it belongs to, removes all data - from the management database, such as configured users, vhosts and - deletes all persistent messages. - -force_reset - the same as I<force> command, but resets the node unconditionally, - regardless of the current management database state and cluster - configuration. - It should only be used as a last resort if the database or cluster - configuration has been corrupted. - -rotate_logs [suffix] - instruct the RabbitMQ node to rotate the log files. The RabbitMQ - broker will attempt to append the current contents of the log file - to the file with the name composed of the original name and the - suffix. It will create a new file if such a file does not already - exist. When no I<suffix> is specified, the empty log file is - simply created at the original location; no rotation takes place. - When an error occurs while appending the contents of the old log - file, the operation behaves in the same way as if no I<suffix> was - specified. - This command might be helpful when you are e.g. writing your own - logrotate script and you do not want to restart the RabbitMQ node. - -cluster I<clusternode> ... - instruct the node to become member of a cluster with the specified - nodes determined by I<clusternode> option(s). - See http://www.rabbitmq.com/clustering.html for more information - about clustering. +=over + +=item stop + +Stop the Erlang node on which RabbitMQ broker is running. + +=item stop_app + +Stop the RabbitMQ application, leaving the Erlang node running. This +command is typically run prior to performing other management actions +that require the RabbitMQ application to be stopped, e.g. I<reset>. + +=item start_app + +Start the RabbitMQ application. This command is typically run prior +to performing other management actions that require the RabbitMQ +application to be stopped, e.g. I<reset>. + +=item status + +Display various information about the RabbitMQ broker, such as whether +the RabbitMQ application on the current node, its version number, what +nodes are part of the broker, which of these are running. + +=item reset + +Return a RabbitMQ node to its virgin state. Removes the node from any +cluster it belongs to, removes all data from the management database, +such as configured users, vhosts and deletes all persistent messages. + +=item force_reset + +The same as I<reset> command, but resets the node unconditionally, +regardless of the current management database state and cluster +configuration. It should only be used as a last resort if the +database or cluster configuration has been corrupted. + +=item rotate_logs [suffix] + +Instruct the RabbitMQ node to rotate the log files. The RabbitMQ +broker will attempt to append the current contents of the log file to +the file with the name composed of the original name and the +suffix. It will create a new file if such a file does not already +exist. When no I<suffix> is specified, the empty log file is simply +created at the original location; no rotation takes place. When an +error occurs while appending the contents of the old log file, the +operation behaves in the same way as if no I<suffix> was specified. +This command might be helpful when you are e.g. writing your own +logrotate script and you do not want to restart the RabbitMQ node. + +=item cluster I<clusternode> ... + +Instruct the node to become member of a cluster with the specified +nodes determined by I<clusternode> option(s). See +L<http://www.rabbitmq.com/clustering.html> for more information about +clustering. + +=back =head2 USER MANAGEMENT -add_user I<username> I<password> - create a user named I<username> with (initial) password I<password>. +=over + +=item add_user I<username> I<password> -delete_user I<username> - delete the user named I<username>. +Create a user named I<username> with (initial) password I<password>. -change_password I<username> I<newpassword> - change the password for the user named I<username> to I<newpassword>. +=item delete_user I<username> -list_users - list all users. +Delete the user named I<username>. + +=item change_password I<username> I<newpassword> + +Change the password for the user named I<username> to I<newpassword>. + +=item list_users + +List all users, one per line. + +=back =head2 ACCESS CONTROL -add_vhost I<vhostpath> - create a new virtual host called I<vhostpath>. +=over + +=item add_vhost I<vhostpath> + +Create a new virtual host called I<vhostpath>. + +=item delete_vhost I<vhostpath> + +Delete a virtual host I<vhostpath>. This command deletes also all its +exchanges, queues and user mappings. + +=item list_vhosts + +List all virtual hosts, one per line. + +=item set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp> -delete_vhost I<vhostpath> - delete a virtual host I<vhostpath>. - That command deletes also all its exchanges, queues and user - mappings. - -list_vhosts - list all virtual hosts. +Set the permissions for the user named I<username> in the virtual host +I<vhostpath>, granting I<configure>, I<write> and I<read> access to +resources with names matching the first, second and third I<regexp>, +respectively. -set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp> - set the permissions for the user named I<username> in the virtual - host I<vhostpath>, granting 'configure', 'write' and 'read' access - to resources with names matching the first, second and third - I<regexp>, respectively. +=item clear_permissions [-p I<vhostpath>] I<username> -clear_permissions [-p I<vhostpath>] I<username> - remove the permissions for the user named I<username> in the - virtual host I<vhostpath>. +Remove the permissions for the user named I<username> in the virtual +host I<vhostpath>. -list_permissions [-p I<vhostpath>] - list all the users and their permissions in the virtual host - I<vhostpath>. +=item list_permissions [-p I<vhostpath>] -list_user_permissions I<username> - list the permissions of the user named I<username> across all - virtual hosts. +List all the users and their permissions in the virtual host +I<vhostpath>. Each output line contains the username and their +I<configure>, I<write> and I<read> access regexps, separated by tab +characters. + +=item list_user_permissions I<username> + +List the permissions of the user named I<username> across all virtual +hosts. + +=back =head2 SERVER STATUS -list_queues [-p I<vhostpath>] [I<queueinfoitem> ...] - list queue information by virtual host. If no I<queueinfoitem>s - are specified then then name and number of messages is displayed - for each queue. +=over + +=item list_queues [-p I<vhostpath>] [I<queueinfoitem> ...] + +List queue information by virtual host. Each line printed +describes a queue, with the requested I<queueinfoitem> values +separated by tab characters. If no I<queueinfoitem>s are +specified then I<name> and I<messages> are assumed. + +=back =head3 Queue information items -=over 4 +=over + +=item name + +name of the queue + +=item durable + +whether the queue survives server restarts -name - URL-encoded name of the queue +=item auto_delete -durable - whether the queue survives server restarts +whether the queue will be deleted when no longer used -auto_delete - whether the queue will be deleted when no longer used +=item arguments -arguments - queue arguments +queue arguments -node - node on which the process associated with the queue resides +=item node -messages_ready - number of messages ready to be delivered to clients +node on which the process associated with the queue resides -messages_unacknowledged - number of messages delivered to clients but not yet - acknowledged +=item messages_ready -messages_uncommitted - number of messages published in as yet uncommitted transactions +number of messages ready to be delivered to clients -messages - sum of ready, unacknowledged and uncommitted messages +=item messages_unacknowledged -acks_uncommitted - number of acknowledgements received in as yet uncommitted - transactions +number of messages delivered to clients but not yet acknowledged -consumers - number of consumers +=item messages_uncommitted -transactions - number of transactions +number of messages published in as yet uncommitted transactions -memory - bytes of memory consumed by the Erlang process for the queue, - including stack, heap and internal structures +=item messages + +sum of ready, unacknowledged and uncommitted messages + +=item acks_uncommitted + +number of acknowledgements received in as yet uncommitted transactions + +=item consumers + +number of consumers + +=item transactions + +number of transactions + +=item memory + +bytes of memory consumed by the Erlang process for the queue, +including stack, heap and internal structures =back -list_exchanges [-p I<vhostpath>] [I<exchangeinfoitem> ...] - list exchange information by virtual host. If no - I<exchangeinfoitem>s are specified then name and type is displayed - for each exchange. +=over + +=item list_exchanges [-p I<vhostpath>] [I<exchangeinfoitem> ...] + +List queue information by virtual host. Each line printed describes an +exchange, with the requested I<exchangeinfoitem> values separated by +tab characters. If no I<exchangeinfoitem>s are specified then I<name> +and I<type> are assumed. + +=back =head3 Exchange information items -=over 4 +=over -name - URL-encoded name of the exchange +=item name -type - exchange type (B<direct>, B<topic>, B<fanout>, or B<headers>) +name of the exchange -durable - whether the exchange survives server restarts +=item type -auto_delete - whether the exchange is deleted when no longer used +exchange type (B<direct>, B<topic>, B<fanout>, or B<headers>) -arguments - exchange arguments +=item durable + +whether the exchange survives server restarts + +=item auto_delete + +whether the exchange is deleted when no longer used + +=item arguments + +exchange arguments =back -list_bindings [-p I<vhostpath>] - list bindings by virtual host. Each line contains exchange name, - routing key and queue name (all URL encoded) and arguments. +=over + +=item list_bindings [-p I<vhostpath>] -list_connections [I<connectioninfoitem> ...] - list connection information. If no I<connectioninfoitem>s are - specified then the user, peer address and peer port are displayed. +List bindings by virtual host. Each line printed describes a binding, +with the exchange name, routing key, queue name and arguments, +separated by tab characters. + +=item list_connections [I<connectioninfoitem> ...] + +List queue information by virtual host. Each line printed describes an +connection, with the requested I<connectioninfoitem> values separated +by tab characters. If no I<connectioninfoitem>s are specified then +I<user>, I<peer_address> and I<peer_port> are assumed. + +=back =head3 Connection information items -=over 4 +=over + +=item node + +node on which the process associated with the connection resides -node - node on which the process associated with the connection resides +=item address -address - server IP number +server IP number -port - server port +=item port -peer_address - peer address +server port -peer_port - peer port +=item peer_address -state - connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>, - B<running>, B<closing>, B<closed>) +peer address -channels - number of channels using the connection +=item peer_port -user - username associated with the connection +peer port -vhost - URL-encoded virtual host +=item state -timeout - connection timeout +connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>, +B<running>, B<closing>, B<closed>) -frame_max - maximum frame size (bytes) +=item channels -recv_oct - octets received +number of channels using the connection -recv_cnt - packets received +=item user -send_oct - octets sent +username associated with the connection -send_cnt - packets sent +=item vhost -send_pend - send queue size +virtual host + +=item timeout + +connection timeout + +=item frame_max + +maximum frame size (bytes) + +=item recv_oct + +octets received + +=item recv_cnt + +packets received + +=item send_oct + +octets sent + +=item send_cnt + +packets sent + +=item send_pend + +send queue size =back The list_queues, list_exchanges and list_bindings commands accept an -optional virtual host parameter for which to display results, defaulting -to I<"/">. The default can be overridden with the B<-p> flag. Result -columns for these commands and list_connections are tab-separated. +optional virtual host parameter for which to display results, +defaulting to I<"/">. The default can be overridden with the B<-p> +flag. + +=head1 OUTPUT ESCAPING + +Various items that may appear in the output of rabbitmqctl can contain +arbitrary octets. If a octet corresponds to a non-printing ASCII +character (values 0 to 31, and 127), it will be escaped in the output, +using a sequence consisting of a backslash character followed by three +octal digits giving the octet's value (i.e., as used in string +literals in the C programming language). An octet corresponding to +the backslash character (i.e. with value 92) will be escaped using a +sequence of two backslash characters. Octets with a value of 128 or +above are not escaped, in order to preserve strings encoded with +UTF-8. + +The items to which this escaping scheme applies are: + +=over + +=item * +Usernames + +=item * +Virtual host names + +=item * +Queue names + +=item * +Exchange names + +=item * +Regular expressions used for access control + +=back =head1 EXAMPLES @@ -309,4 +428,4 @@ The RabbitMQ Team <info@rabbitmq.com> =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 0057ea04..6fc6e464 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -15,6 +15,8 @@ %% actually want to start it {mod, {rabbit, []}}, {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, + {ssl_listeners, []}, + {ssl_options, []}, {extra_startup_steps, []}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 784c21b3..d1a2f3bd 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -64,6 +64,7 @@ -record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). %%---------------------------------------------------------------------------- @@ -74,7 +75,8 @@ -type(maybe(T) :: T | 'none'). -type(erlang_node() :: atom()). --type(socket() :: port()). +-type(ssl_socket() :: #ssl_socket{}). +-type(socket() :: port() | ssl_socket()). -type(thunk(T) :: fun(() -> T)). -type(info_key() :: atom()). -type(info() :: {info_key(), any()}). diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index c74d4533..fa2844fd 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -1,7 +1,8 @@ -VERSION=0.0.0 -SOURCE_TARBALL_DIR=../../../dist +TARBALL_DIR=../../../dist +TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz)) COMMON_DIR=../../common -TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz +VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') + TOP_DIR=$(shell pwd) #Under debian we do not want to check build dependencies, since that #only checks build-dependencies using rpms, not debs @@ -23,13 +24,16 @@ rpms: clean server prepare: mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp - cp $(TOP_DIR)/$(TARBALL) SOURCES + cp $(TARBALL_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \ SPECS/rabbitmq-server.spec - cp init.d SOURCES/rabbitmq-server.init cp ${COMMON_DIR}/* SOURCES/ + sed -i \ + -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \ + -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ + SOURCES/rabbitmq-server.init cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index eb953b81..7f442831 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -9,6 +9,7 @@ Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{v Source1: rabbitmq-server.init Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate +Source4: rabbitmq-asroot-script-wrapper URL: http://www.rabbitmq.com/ BuildRequires: erlang, python-simplejson Requires: erlang, logrotate @@ -22,9 +23,10 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version} +%define _rabbit_erllibdir %{_libdir}/rabbitmq/lib/rabbitmq_server-%{version} %define _rabbit_libdir %{_libdir}/rabbitmq %define _rabbit_wrapper %{_builddir}/`basename %{S:2}` +%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}` %define _maindir %{buildroot}%{_rabbit_erllibdir} @@ -34,6 +36,8 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper} +cp %{S:4} %{_rabbit_asroot_wrapper} +sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_asroot_wrapper} make %{?_smp_mflags} %install @@ -51,6 +55,7 @@ install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi +install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper new file mode 100644 index 00000000..0dd1c0fb --- /dev/null +++ b/packaging/common/rabbitmq-asroot-script-wrapper @@ -0,0 +1,53 @@ +#!/bin/bash +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +# Escape spaces and quotes, because shell is revolting. +for arg in "$@" ; do + # Escape quotes in parameters, so that they're passed through cleanly. + arg=$(sed -e 's/"/\\"/' <<-END + $arg + END + ) + CMDLINE="${CMDLINE} \"${arg}\"" +done + +cd /var/lib/rabbitmq + +SCRIPT=`basename $0` + +if [ `id -u` = 0 ] ; then + /usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE} +else + echo -e "\nOnly root should run ${SCRIPT}\n" + exit 1 +fi + diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 296a77d1..f1a9b1ff 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -1,4 +1,35 @@ #!/bin/bash +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + # Escape spaces and quotes, because shell is revolting. for arg in "$@" ; do # Escape quotes in parameters, so that they're passed through cleanly. diff --git a/packaging/RPMS/Fedora/init.d b/packaging/common/rabbitmq-server.init index 77a6a89a..e71562f8 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/common/rabbitmq-server.init @@ -8,10 +8,10 @@ ### BEGIN INIT INFO # Provides: rabbitmq-server -# Default-Start: -# Default-Stop: # Required-Start: $remote_fs $network # Required-Stop: $remote_fs $network +# Default-Start: +# Default-Stop: # Description: RabbitMQ broker # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO @@ -24,13 +24,14 @@ USER=rabbitmq NODE_COUNT=1 ROTATE_SUFFIX= -LOCK_FILE=/var/lock/subsys/$NAME +DEFAULTS_FILE= # This is filled in when building packages +LOCK_FILE= # This is filled in when building packages test -x $DAEMON || exit 0 # Include rabbitmq defaults if available -if [ -f /etc/sysconfig/rabbitmq ] ; then - . /etc/sysconfig/rabbitmq +if [ -f "$DEFAULTS_FILE" ] ; then + . $DEFAULTS_FILE fi RETVAL=0 @@ -41,7 +42,8 @@ start_rabbitmq () { $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err case "$?" in 0) - echo SUCCESS && touch $LOCK_FILE + echo SUCCESS + [ -n "$LOCK_FILE" ] && touch $LOCK_FILE RETVAL=0 ;; 1) @@ -52,7 +54,7 @@ start_rabbitmq () { echo FAILED - check /var/log/rabbitmq/startup_log, _err RETVAL=1 ;; - esac + esac set -e } @@ -62,10 +64,12 @@ stop_rabbitmq () { if [ $RETVAL = 0 ] ; then $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err RETVAL=$? - if [ $RETVAL != 0 ] ; then - echo FAILED - check /var/log/rabbitmq/shutdown_log, _err + if [ $RETVAL = 0 ] ; then + # Try to stop epmd if run by the rabbitmq user + pkill -u rabbitmq epmd || : + [ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE else - rm -rf $LOCK_FILE + echo FAILED - check /var/log/rabbitmq/shutdown_log, _err fi else echo No nodes running @@ -119,19 +123,14 @@ case "$1" in echo -n "Rotating log files for $DESC: " rotate_logs_rabbitmq ;; - force-reload|reload|restart) - echo -n "Restarting $DESC: " - restart_rabbitmq - echo "$NAME." - ;; - condrestart|try-restart) + force-reload|reload|restart|condrestart|try-restart) echo -n "Restarting $DESC: " restart_rabbitmq echo "$NAME." ;; *) echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2 - RETVAL=2 + RETVAL=1 ;; esac diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 67fabae0..dafaf9ce 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -1,8 +1,9 @@ TARBALL_DIR=../../../dist -TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz)) +TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz)) COMMON_DIR=../../common -DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g') VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') + +DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g') UNPACKED_DIR=rabbitmq-server-$(VERSION) PACKAGENAME=rabbitmq-server SIGNING_KEY_ID=056E8E56 @@ -21,6 +22,10 @@ package: clean tar -zxvf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/ + sed -i \ + -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \ + -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \ + $(UNPACKED_DIR)/debian/rabbitmq-server.init chmod a+x $(UNPACKED_DIR)/debian/rules UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING) diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d deleted file mode 100644 index a35a60ec..00000000 --- a/packaging/debs/Debian/debian/init.d +++ /dev/null @@ -1,122 +0,0 @@ -#!/bin/sh -### BEGIN INIT INFO -# Provides: rabbitmq -# Required-Start: $remote_fs $network -# Required-Stop: $remote_fs $network -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Description: RabbitMQ broker -# Short-Description: Enable AMQP service provided by RabbitMQ broker -### END INIT INFO - -PATH=/sbin:/usr/sbin:/bin:/usr/bin -DAEMON=/usr/sbin/rabbitmq-multi -NAME=rabbitmq-server -DESC=rabbitmq-server -USER=rabbitmq -NODE_COUNT=1 -ROTATE_SUFFIX= - -test -x $DAEMON || exit 0 - -# Include rabbitmq defaults if available -if [ -f /etc/default/rabbitmq ] ; then - . /etc/default/rabbitmq -fi - -RETVAL=0 -set -e - -start_rabbitmq () { - set +e - $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err - case "$?" in - 0) - echo SUCCESS - RETVAL=0 - ;; - 1) - echo TIMEOUT - check /var/log/rabbitmq/startup_\{log,err\} - RETVAL=1 - ;; - *) - echo FAILED - check /var/log/rabbitmq/startup_log, _err - RETVAL=1 - ;; - esac - set -e -} - -stop_rabbitmq () { - set +e - status_rabbitmq quiet - if [ $RETVAL = 0 ] ; then - $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err - RETVAL=$? - if [ $RETVAL != 0 ] ; then - echo FAILED - check /var/log/rabbitmq/shutdown_log, _err - fi - else - echo No nodes running - RETVAL=0 - fi - set -e -} - -status_rabbitmq() { - set +e - if [ "$1" != "quiet" ] ; then - $DAEMON status 2>&1 - else - $DAEMON status > /dev/null 2>&1 - fi - if [ $? != 0 ] ; then - RETVAL=1 - fi - set -e -} - -rotate_logs_rabbitmq() { - set +e - $DAEMON rotate_logs ${ROTATE_SUFFIX} - if [ $? != 0 ] ; then - RETVAL=1 - fi - set -e -} - -restart_rabbitmq() { - stop_rabbitmq - start_rabbitmq -} - -case "$1" in - start) - echo -n "Starting $DESC: " - start_rabbitmq - echo "$NAME." - ;; - stop) - echo -n "Stopping $DESC: " - stop_rabbitmq - echo "$NAME." - ;; - status) - status_rabbitmq - ;; - rotate-logs) - echo -n "Rotating log files for $DESC: " - rotate_logs_rabbitmq - ;; - force-reload|restart) - echo -n "Restarting $DESC: " - restart_rabbitmq - echo "$NAME." - ;; - *) - echo "Usage: $0 {start|stop|status|rotate-logs|restart|force-reload}" >&2 - RETVAL=1 - ;; -esac - -exit $RETVAL diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 31904851..365eea6e 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -3,7 +3,7 @@ include /usr/share/cdbs/1/rules/debhelper.mk include /usr/share/cdbs/1/class/makefile.mk -RABBIT_LIB=$(DEB_DESTDIR)usr/lib/erlang/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/ +RABBIT_LIB=$(DEB_DESTDIR)usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/ RABBIT_BIN=$(DEB_DESTDIR)usr/lib/rabbitmq/bin/ DEB_MAKE_INSTALL_TARGET := install TARGET_DIR=$(RABBIT_LIB) SBIN_DIR=$(RABBIT_BIN) MAN_DIR=$(DEB_DESTDIR)usr/share/man/ @@ -17,3 +17,6 @@ install/rabbitmq-server:: for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \ install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done + for script in rabbitmq-activate-plugins; do \ + install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ + done diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile index b3988696..4eade6c7 100644 --- a/packaging/generic-unix/Makefile +++ b/packaging/generic-unix/Makefile @@ -4,10 +4,10 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION) dist: - make -C ../.. VERSION=$(VERSION) srcdist + $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz - make -C $(SOURCE_DIR) \ + $(MAKE) -C $(SOURCE_DIR) \ TARGET_DIR=`pwd`/$(TARGET_DIR) \ SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \ MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \ diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile index b8096d20..1826d5c4 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/net/rabbitmq-server/Portfile @@ -42,7 +42,7 @@ use_parallel_build yes build.args PYTHON=${prefix}/bin/python2.5 destroot.destdir \ - TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \ + TARGET_DIR=${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version} \ SBIN_DIR=${sbindir} \ MAN_DIR=${destroot}${prefix}/share/man @@ -61,9 +61,7 @@ post-destroot { xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir} reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ - ${sbindir}/rabbitmq-multi \ - ${sbindir}/rabbitmq-server \ - ${sbindir}/rabbitmqctl + ${sbindir}/rabbitmq-env reinplace -E "s:(CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \ ${sbindir}/rabbitmq-multi \ ${sbindir}/rabbitmq-server \ @@ -83,14 +81,19 @@ post-destroot { xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ ${wrappersbin}/rabbitmq-multi + xinstall -m 555 ${filespath}/rabbitmq-asroot-script-wrapper \ + ${wrappersbin}/rabbitmq-activate-plugins reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ ${wrappersbin}/rabbitmq-multi reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ ${wrappersbin}/rabbitmq-multi + reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ + ${wrappersbin}/rabbitmq-activate-plugins + reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ + ${wrappersbin}/rabbitmq-activate-plugins file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl - } pre-install { diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper new file mode 100644 index 00000000..c4488dcb --- /dev/null +++ b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper @@ -0,0 +1,12 @@ +#!/bin/bash +cd /var/lib/rabbitmq + +SCRIPT=`basename $0` + +if [ `id -u` = 0 ] ; then + /usr/lib/rabbitmq/bin/${SCRIPT} "$@" +else + echo -e "\nOnly root should run ${SCRIPT}\n" + exit 1 +fi + diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index 59101cb2..387becb3 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -4,15 +4,16 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_ZIP=rabbitmq-server-windows-$(VERSION) dist: - make -C ../.. VERSION=$(VERSION) srcdist + $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz - make -C $(SOURCE_DIR) + $(MAKE) -C $(SOURCE_DIR) mkdir $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin + mv $(SOURCE_DIR)/scripts/rabbitmq-activate-plugins.bat $(SOURCE_DIR)/sbin rm -rf $(SOURCE_DIR)/scripts rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile rm -f $(SOURCE_DIR)/README diff --git a/scripts/activate-plugins b/scripts/rabbitmq-activate-plugins index 52f7ddbe..5ce64c68 100755 --- a/scripts/activate-plugins +++ b/scripts/rabbitmq-activate-plugins @@ -30,18 +30,18 @@ ## Contributor(s): ______________________________________. ## -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env -RABBITMQ_EBIN=`dirname $0`/../ebin -[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="`dirname $0`/../plugins" -[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="`dirname $0`/../priv/plugins" +RABBITMQ_EBIN=${RABBITMQ_HOME}/ebin +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="${RABBITMQ_HOME}/priv/plugins" exec erl \ -pa "$RABBITMQ_EBIN" \ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ - -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ - -rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \ - -noinput \ + -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ + -rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \ + -noinput \ -hidden \ -s rabbit_plugin_activator \ -extra "$@" diff --git a/scripts/activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat index 8bef4ad2..3540bf2d 100644 --- a/scripts/activate-plugins.bat +++ b/scripts/rabbitmq-activate-plugins.bat @@ -30,10 +30,6 @@ REM REM Contributor(s): ______________________________________.
REM
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env new file mode 100755 index 00000000..69ddbcfe --- /dev/null +++ b/scripts/rabbitmq-env @@ -0,0 +1,53 @@ +#!/bin/sh +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +# Determine where this script is really located +SCRIPT_PATH="$0" +while [ -h "$SCRIPT_PATH" ] ; do + FULL_PATH=`readlink -f $SCRIPT_PATH 2>/dev/null` + if [ "$?" != "0" ]; then + REL_PATH=`readlink $SCRIPT_PATH` + if expr "$REL_PATH" : '/.*' > /dev/null; then + SCRIPT_PATH="$REL_PATH" + else + SCRIPT_PATH="`dirname "$SCRIPT_PATH"`/$REL_PATH" + fi + else + SCRIPT_PATH=$FULL_PATH + fi +done + +SCRIPT_DIR=`dirname $SCRIPT_PATH` +RABBITMQ_HOME="${SCRIPT_DIR}/.." + +# Load configuration from the rabbitmq.conf file +[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 1d0c785f..7db4cb70 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -37,7 +37,7 @@ PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= MULTI_START_ARGS= -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} @@ -60,7 +60,7 @@ export \ set -f exec erl \ - -pa "`dirname $0`/../ebin" \ + -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_MULTI_ERL_ARGS} \ diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index a30c0889..8abf13f1 100755 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -49,10 +49,6 @@ if "%RABBITMQ_NODE_PORT%"=="" ( set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 41e84639..547220b4 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -41,7 +41,7 @@ LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia SERVER_START_ARGS= -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} @@ -75,7 +75,7 @@ fi RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit' -RABBITMQ_EBIN_ROOT="`dirname $0`/../ebin" +RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit" RABBITMQ_EBIN_PATH="" diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index b4868841..a784fee3 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -46,10 +46,6 @@ if "%RABBITMQ_NODE_PORT%"=="" ( set RABBITMQ_NODE_PORT=5672
)
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index c57978c0..9c45e73d 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,12 +30,12 @@ ## Contributor(s): ______________________________________. ## -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} exec erl \ - -pa "`dirname $0`/../ebin" \ + -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_CTL_ERL_ARGS} \ diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index e4dccfba..5111724f 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -30,10 +30,6 @@ REM REM Contributor(s): ______________________________________.
REM
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 732757c4..c74b39a9 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -55,7 +55,8 @@ -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, + out/1, join/2]). %%---------------------------------------------------------------------------- @@ -73,6 +74,7 @@ -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -endif. @@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +join(A, {queue, [], []}) -> + A; +join({queue, [], []}, B) -> + B; +join({queue, AIn, AOut}, {queue, BIn, BOut}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; +join(A = {queue, _, _}, {pqueue, BPQ}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + Post1 = case Post of + [] -> [ {0, A} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; + _ -> [ {0, A} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, B = {queue, _, _}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + Post1 = case Post of + [] -> [ {0, B} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; + _ -> [ {0, B} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, {pqueue, BPQ}) -> + {pqueue, merge(APQ, BPQ, [])}. + +merge([], BPQ, Acc) -> + lists:reverse(Acc, BPQ); +merge(APQ, [], Acc) -> + lists:reverse(Acc, APQ); +merge([{P, A}|As], [{P, B}|Bs], Acc) -> + merge(As, Bs, [ {P, join(A, B)} | Acc ]); +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> + merge(As, Bs, [ {PA, A} | Acc ]); +merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> + merge(As, Bs, [ {PB, B} | Acc ]). + r2f([]) -> {queue, [], []}; r2f([_] = R) -> {queue, [], R}; r2f([X,Y]) -> {queue, [X], [Y]}; diff --git a/src/rabbit.erl b/src/rabbit.erl index 6ce90d93..ef1e0049 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -133,13 +133,13 @@ start(normal, []) -> ok = start_child(rabbit_log), ok = rabbit_hooks:start(), - ok = rabbit_amqqueue:start(), + ok = rabbit_binary_generator: + check_empty_content_body_frame_size(), {ok, MemoryAlarms} = application:get_env(memory_alarms), ok = rabbit_alarm:start(MemoryAlarms), - ok = rabbit_binary_generator: - check_empty_content_body_frame_size(), + ok = rabbit_amqqueue:start(), ok = start_child(rabbit_router), ok = start_child(rabbit_node_monitor) @@ -168,12 +168,27 @@ start(normal, []) -> {"TCP listeners", fun () -> ok = rabbit_networking:start(), - {ok, TCPListeners} = application:get_env(tcp_listeners), + {ok, TcpListeners} = application:get_env(tcp_listeners), lists:foreach( fun ({Host, Port}) -> ok = rabbit_networking:start_tcp_listener(Host, Port) end, - TCPListeners) + TcpListeners) + end}, + {"SSL listeners", + fun () -> + case application:get_env(ssl_listeners) of + {ok, []} -> + ok; + {ok, SslListeners} -> + ok = rabbit_misc:start_applications([crypto, ssl]), + + {ok, SslOpts} = application:get_env(ssl_options), + + [rabbit_networking:start_ssl_listener + (Host, Port, SslOpts) || {Host, Port} <- SslListeners], + ok + end end}]), io:format("~nbroker running~n"), diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 21999f16..309c9a0e 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -41,7 +41,7 @@ -define(MEMSUP_CHECK_INTERVAL, 1000). %% OSes on which we know memory alarms to be trustworthy --define(SUPPORTED_OS, [{unix, linux}]). +-define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]). -record(alarms, {alertees, system_memory_high_watermark = false}). @@ -136,33 +136,35 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- start_memsup() -> - Mod = case os:type() of - %% memsup doesn't take account of buffers or cache when - %% considering "free" memory - therefore on Linux we can - %% get memory alarms very easily without any pressure - %% existing on memory at all. Therefore we need to use - %% our own simple memory monitor. - %% - {unix, linux} -> rabbit_memsup_linux; - - %% Start memsup programmatically rather than via the - %% rabbitmq-server script. This is not quite the right - %% thing to do as os_mon checks to see if memsup is - %% available before starting it, but as memsup is - %% available everywhere (even on VXWorks) it should be - %% ok. - %% - %% One benefit of the programmatic startup is that we - %% can add our alarm_handler before memsup is running, - %% thus ensuring that we notice memory alarms that go - %% off on startup. - %% - _ -> memsup - end, + {Mod, Args} = + case os:type() of + %% memsup doesn't take account of buffers or cache when + %% considering "free" memory - therefore on Linux we can + %% get memory alarms very easily without any pressure + %% existing on memory at all. Therefore we need to use + %% our own simple memory monitor. + %% + {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]}; + {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]}; + + %% Start memsup programmatically rather than via the + %% rabbitmq-server script. This is not quite the right + %% thing to do as os_mon checks to see if memsup is + %% available before starting it, but as memsup is + %% available everywhere (even on VXWorks) it should be + %% ok. + %% + %% One benefit of the programmatic startup is that we + %% can add our alarm_handler before memsup is running, + %% thus ensuring that we notice memory alarms that go + %% off on startup. + %% + _ -> {memsup, []} + end, %% This is based on os_mon:childspec(memsup, true) {ok, _} = supervisor:start_child( os_mon_sup, - {memsup, {Mod, start_link, []}, + {memsup, {Mod, start_link, Args}, permanent, 2000, worker, [Mod]}), ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4903c2c5..f05f7880 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -303,10 +303,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:cast(QPid, {unblock, ChPid}). + gen_server2:pcast(QPid, 8, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 58b94234..16b7c938 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -157,6 +157,10 @@ handle_cast({conserve_memory, Conserve}, State) -> State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), noreply(State). +handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, + State = #ch{writer_pid = WriterPid}) -> + State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 37e4d189..cf20520e 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -329,22 +329,23 @@ format_info_item(Items, Key) -> {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items), case Info of {_, #resource{name = Name}} -> - url_encode(Name); + escape(Name); _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) -> inet_parse:ntoa(Value); _ when is_pid(Value) -> atom_to_list(node(Value)); _ when is_binary(Value) -> - url_encode(Value); + escape(Value); _ -> io_lib:format("~w", [Value]) end. display_list(L) when is_list(L) -> lists:foreach(fun (I) when is_binary(I) -> - io:format("~s~n", [url_encode(I)]); + io:format("~s~n", [escape(I)]); (I) when is_tuple(I) -> - display_row([url_encode(V) || V <- tuple_to_list(I)]) + display_row([escape(V) + || V <- tuple_to_list(I)]) end, lists:sort(L)), ok; @@ -356,32 +357,23 @@ call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, Args) -> rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). -%% url_encode is lifted from ibrowse, modified to preserve some characters -url_encode(Bin) when binary(Bin) -> - url_encode_char(lists:reverse(binary_to_list(Bin)), []). - -url_encode_char([X | T], Acc) when X >= $a, X =< $z -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) when X >= $A, X =< $Z -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) when X >= $0, X =< $9 -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) - when X == $-; X == $_; X == $.; X == $~; - X == $!; X == $*; X == $'; X == $(; - X == $); X == $;; X == $:; X == $@; - X == $&; X == $=; X == $+; X == $$; - X == $,; X == $/; X == $?; X == $%; - X == $#; X == $[; X == $] -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) -> - url_encode_char(T, [$%, d2h(X bsr 4), d2h(X band 16#0f) | Acc]); -url_encode_char([], Acc) -> +%% escape does C-style backslash escaping of non-printable ASCII +%% characters. We don't escape characters above 127, since they may +%% form part of UTF-8 strings. + +escape(Bin) when binary(Bin) -> + escape_char(lists:reverse(binary_to_list(Bin)), []). + +escape_char([$\\ | T], Acc) -> + escape_char(T, [$\\, $\\ | Acc]); +escape_char([X | T], Acc) when X > 32, X /= 127 -> + escape_char(T, [X | Acc]); +escape_char([X | T], Acc) -> + escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3), + $0 + (X band 7) | Acc]); +escape_char([], Acc) -> Acc. -d2h(N) when N<10 -> N+$0; -d2h(N) -> N+$a-10. - list_replace(Find, Replace, List) -> [case X of Find -> Replace; _ -> X end || X <- List]. diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2be00503..b789fbd1 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -42,6 +42,7 @@ terminate/2, code_change/3]). -define(SERVER, ?MODULE). +-define(SERIAL_FILENAME, "rabbit_serial"). -record(state, {serial}). @@ -59,17 +60,28 @@ %%---------------------------------------------------------------------------- start_link() -> - %% The persister can get heavily loaded, and we don't want that to - %% impact guid generation. We therefore keep the serial in a - %% separate process rather than calling rabbit_persister:serial/0 - %% directly in the functions below. gen_server:start_link({local, ?SERVER}, ?MODULE, - [rabbit_persister:serial()], []). + [update_disk_serial()], []). + +update_disk_serial() -> + Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), + Serial = case rabbit_misc:read_term_file(Filename) of + {ok, [Num]} -> Num; + {error, enoent} -> rabbit_persister:serial(); + {error, Reason} -> + throw({error, {cannot_read_serial_file, Filename, Reason}}) + end, + case rabbit_misc:write_term_file(Filename, [Serial + 1]) of + ok -> ok; + {error, Reason1} -> + throw({error, {cannot_write_serial_file, Filename, Reason1}}) + end, + Serial. %% generate a guid that is monotonically increasing per process. %% %% The id is only unique within a single cluster and as long as the -%% persistent message store hasn't been deleted. +%% serial store hasn't been deleted. guid() -> %% We don't use erlang:now() here because a) it may return %% duplicates when the system clock has been rewound prior to a @@ -77,7 +89,7 @@ guid() -> %% now() to move ahead of the system time), and b) it is really %% slow since it takes a global lock and makes a system call. %% - %% rabbit_persister:serial/0, in combination with self/0 (which + %% A persisted serial number, in combination with self/0 (which %% includes the node name) uniquely identifies a process in space %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 0a68c9ad..ed0066fe 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -53,7 +53,7 @@ start_heartbeat(Sock, TimeoutSec) -> spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch gen_tcp:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), + catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), continue end, erlang:monitor(process, Parent)) end), @@ -73,7 +73,7 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) -> {'DOWN', MonitorRef, process, _Object, _Info} -> ok; Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> - case inet:getstat(Sock, [StatName]) of + case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> if NewStatVal =/= StatVal -> F({NewStatVal, 0}); diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl new file mode 100644 index 00000000..b0d57cb2 --- /dev/null +++ b/src/rabbit_memsup.erl @@ -0,0 +1,142 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([update/0]). + +-record(state, {memory_fraction, + timeout, + timer, + mod, + mod_state, + alarmed + }). + +-define(SERVER, memsup). %% must be the same as the standard memsup + +-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(update/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). + +update() -> + gen_server:cast(?SERVER, update). + +%%---------------------------------------------------------------------------- + +init([Mod]) -> + Fraction = os_mon:get_env(memsup, system_memory_high_watermark), + TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), + InitState = Mod:init(), + State = #state { memory_fraction = Fraction, + timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, + timer = TRef, + mod = Mod, + mod_state = InitState, + alarmed = false }, + {ok, internal_update(State)}. + +start_timer(Timeout) -> + {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), + TRef. + +%% Export the same API as the real memsup. Note that +%% get_sysmem_high_watermark gives an int in the range 0 - 100, while +%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. +handle_call(get_sysmem_high_watermark, _From, State) -> + {reply, trunc(100 * State#state.memory_fraction), State}; + +handle_call({set_sysmem_high_watermark, Float}, _From, State) -> + {reply, ok, State#state{memory_fraction = Float}}; + +handle_call(get_check_interval, _From, State) -> + {reply, State#state.timeout, State}; + +handle_call({set_check_interval, Timeout}, _From, State) -> + {ok, cancel} = timer:cancel(State#state.timer), + {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; + +handle_call(get_memory_data, _From, + State = #state { mod = Mod, mod_state = ModState }) -> + {reply, Mod:get_memory_data(ModState), State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(update, State) -> + {noreply, internal_update(State)}; + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +internal_update(State = #state { memory_fraction = MemoryFraction, + alarmed = Alarmed, + mod = Mod, mod_state = ModState }) -> + ModState1 = Mod:update(ModState), + {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1), + NewAlarmed = MemUsed / MemTotal > MemoryFraction, + case {Alarmed, NewAlarmed} of + {false, true} -> + alarm_handler:set_alarm({system_memory_high_watermark, []}); + {true, false} -> + alarm_handler:clear_alarm(system_memory_high_watermark); + _ -> + ok + end, + State #state { mod_state = ModState1, alarmed = NewAlarmed }. diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl new file mode 100644 index 00000000..3de2d843 --- /dev/null +++ b/src/rabbit_memsup_darwin.erl @@ -0,0 +1,88 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup_darwin). + +-export([init/0, update/1, get_memory_data/1]). + +-record(state, {total_memory, + allocated_memory}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). + +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). + +-endif. + +%%---------------------------------------------------------------------------- + +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. + +update(State) -> + File = os:cmd("/usr/bin/vm_stat"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), + [PageSize, Inactive, Active, Free, Wired] = + [dict:fetch(Key, Dict) || + Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free', + 'Pages wired down']], + MemTotal = PageSize * (Inactive + Active + Free + Wired), + MemUsed = PageSize * (Active + Wired), + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. + +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. + +%%---------------------------------------------------------------------------- + +%% A line looks like "Foo bar: 123456." +parse_line(Line) -> + [Name, RHS | _Rest] = string:tokens(Line, ":"), + case Name of + "Mach Virtual Memory Statistics" -> + ["(page", "size", "of", PageSize, "bytes)"] = + string:tokens(RHS, " "), + {page_size, list_to_integer(PageSize)}; + _ -> + [Value | _Rest1] = string:tokens(RHS, " ."), + {list_to_atom(Name), list_to_integer(Value)} + end. diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl index ffdc7e99..ca942d7c 100644 --- a/src/rabbit_memsup_linux.erl +++ b/src/rabbit_memsup_linux.erl @@ -31,104 +31,44 @@ -module(rabbit_memsup_linux). --behaviour(gen_server). +-export([init/0, update/1, get_memory_data/1]). --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([update/0]). - --define(SERVER, memsup). %% must be the same as the standard memsup - --define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). - --record(state, {memory_fraction, alarmed, timeout, timer}). +-record(state, {total_memory, + allocated_memory}). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(update/0 :: () -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). -update() -> - gen_server:cast(?SERVER, update). +-endif. %%---------------------------------------------------------------------------- -init(_Args) -> - Fraction = os_mon:get_env(memsup, system_memory_high_watermark), - TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), - {ok, #state{alarmed = false, - memory_fraction = Fraction, - timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, - timer = TRef}}. - -start_timer(Timeout) -> - {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), - TRef. - -%% Export the same API as the real memsup. Note that -%% get_sysmem_high_watermark gives an int in the range 0 - 100, while -%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. -handle_call(get_sysmem_high_watermark, _From, State) -> - {reply, trunc(100 * State#state.memory_fraction), State}; - -handle_call({set_sysmem_high_watermark, Float}, _From, State) -> - {reply, ok, State#state{memory_fraction = Float}}; +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. -handle_call(get_check_interval, _From, State) -> - {reply, State#state.timeout, State}; - -handle_call({set_check_interval, Timeout}, _From, State) -> - {ok, cancel} = timer:cancel(State#state.timer), - {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast(update, State = #state{alarmed = Alarmed, - memory_fraction = MemoryFraction}) -> +update(State) -> File = read_proc_file("/proc/meminfo"), Lines = string:tokens(File, "\n"), Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), - MemTotal = dict:fetch('MemTotal', Dict), - MemUsed = MemTotal - - dict:fetch('MemFree', Dict) - - dict:fetch('Buffers', Dict) - - dict:fetch('Cached', Dict), - NewAlarmed = MemUsed / MemTotal > MemoryFraction, - case {Alarmed, NewAlarmed} of - {false, true} -> - alarm_handler:set_alarm({system_memory_high_watermark, []}); - {true, false} -> - alarm_handler:clear_alarm(system_memory_high_watermark); - _ -> - ok - end, - {noreply, State#state{alarmed = NewAlarmed}}; - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. + [MemTotal, MemFree, Buffers, Cached] = + [dict:fetch(Key, Dict) || + Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']], + MemUsed = MemTotal - MemFree - Buffers - Cached, + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. + +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. %%---------------------------------------------------------------------------- @@ -152,5 +92,10 @@ read_proc_file(IoDevice, Acc) -> %% A line looks like "FooBar: 123456 kB" parse_line(Line) -> - [Name, Value | _] = string:tokens(Line, ": "), - {list_to_atom(Name), list_to_integer(Value)}. + [Name, RHS | _Rest] = string:tokens(Line, ":"), + [Value | UnitsRest] = string:tokens(RHS, " "), + Value1 = case UnitsRest of + [] -> list_to_integer(Value); %% no units + ["kB"] -> list_to_integer(Value) * 1024 + end, + {list_to_atom(Name), Value1}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index abf4c7cc..95a274e3 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -50,9 +50,11 @@ -export([intersperse/2, upmap/2, map_in_order/2]). -export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). +-export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). +-export([unfold/2, ceil/1]). -import(mnesia). -import(lists). @@ -65,6 +67,8 @@ -include_lib("kernel/include/inet.hrl"). +-type(ok_or_error() :: 'ok' | {'error', any()}). + -spec(method_record_type/1 :: (tuple()) -> atom()). -spec(polite_pause/0 :: () -> 'done'). -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). @@ -88,9 +92,9 @@ -spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). --spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). +-spec(enable_cover/0 :: () -> ok_or_error()). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}). +-spec(enable_cover/1 :: (string()) -> ok_or_error()). -spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). @@ -100,7 +104,7 @@ -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). --spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). +-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> erlang_node()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). @@ -110,12 +114,16 @@ -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). --spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). +-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). +-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (string(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). +-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). +-spec(ceil/1 :: (number()) -> number()). -endif. @@ -360,7 +368,9 @@ dirty_foreach_key1(F, TableName, K) -> end. dirty_dump_log(FileName) -> - {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]), + {ok, LH} = disk_log:open([{name, dirty_dump_log}, + {mode, read_only}, + {file, FileName}]), dirty_dump_log1(LH, disk_log:chunk(LH, start)), disk_log:close(LH). @@ -374,6 +384,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> dirty_dump_log1(LH, disk_log:chunk(LH, K)). +read_term_file(File) -> file:consult(File). + +write_term_file(File, Terms) -> + file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || + Term <- Terms])). + append_file(File, Suffix) -> case file:read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); @@ -444,3 +460,18 @@ stop_applications(Apps) -> cannot_stop_application, Apps). +unfold(Fun, Init) -> + unfold(Fun, [], Init). + +unfold(Fun, Acc, Init) -> + case Fun(Init) of + {true, E, I} -> unfold(Fun, [E|Acc], I); + false -> {Acc, Init} + end. + +ceil(N) -> + T = trunc(N), + case N - T of + 0 -> N; + _ -> 1 + T + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 575ecb0a..37e20335 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -149,6 +149,11 @@ table_definitions() -> table_names() -> [Tab || {Tab, _} <- table_definitions()]. +replicated_table_names() -> + [Tab || {Tab, Attrs} <- table_definitions(), + not lists:member({local_content, true}, Attrs) + ]. + dir() -> mnesia:system_info(directory). ensure_mnesia_dir() -> @@ -192,28 +197,16 @@ cluster_nodes_config_filename() -> create_cluster_nodes_config(ClusterNodes) -> FileName = cluster_nodes_config_filename(), - Handle = case file:open(FileName, [write]) of - {ok, Device} -> Device; - {error, Reason} -> - throw({error, {cannot_create_cluster_nodes_config, - FileName, Reason}}) - end, - try - ok = io:write(Handle, ClusterNodes), - ok = io:put_chars(Handle, [$.]) - after - case file:close(Handle) of - ok -> ok; - {error, Reason1} -> - throw({error, {cannot_close_cluster_nodes_config, - FileName, Reason1}}) - end - end, - ok. + case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_create_cluster_nodes_config, + FileName, Reason}}) + end. read_cluster_nodes_config() -> FileName = cluster_nodes_config_filename(), - case file:consult(FileName) of + case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> case application:get_env(cluster_config) of @@ -250,12 +243,10 @@ delete_cluster_nodes_config() -> %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. init_db(ClusterNodes) -> - WasDiskNode = mnesia:system_info(use_dir), - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of {ok, []} -> - if WasDiskNode and IsDiskNode -> + case mnesia:system_info(use_dir) of + true -> case check_schema_integrity() of ok -> ok; @@ -270,22 +261,18 @@ init_db(ClusterNodes) -> ok = move_db(), ok = create_schema() end; - WasDiskNode -> - throw({error, {cannot_convert_disk_node_to_ram_node, - ClusterNodes}}); - IsDiskNode -> - ok = create_schema(); - true -> - throw({error, {unable_to_contact_cluster_nodes, - ClusterNodes}}) + false -> + ok = create_schema() end; {ok, [_|_]} -> - ok = wait_for_tables(), - ok = create_local_table_copies( - case IsDiskNode of - true -> disc; - false -> ram - end); + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end); {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or @@ -336,40 +323,36 @@ create_tables() -> table_definitions()), ok. +table_has_copy_type(TabDef, DiscType) -> + lists:member(node(), proplists:get_value(DiscType, TabDef, [])). + create_local_table_copies(Type) -> - ok = if Type /= ram -> create_local_table_copy(schema, disc_copies); - true -> ok - end, lists:foreach( fun({Tab, TabDef}) -> - HasDiscCopies = - lists:keymember(disc_copies, 1, TabDef), - HasDiscOnlyCopies = - lists:keymember(disc_only_copies, 1, TabDef), + HasDiscCopies = table_has_copy_type(TabDef, disc_copies), + HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies), + LocalTab = proplists:get_bool(local_content, TabDef), StorageType = - case Type of - disc -> + if + Type =:= disc orelse LocalTab -> if - HasDiscCopies -> disc_copies; + HasDiscCopies -> disc_copies; HasDiscOnlyCopies -> disc_only_copies; - true -> ram_copies + true -> ram_copies end; %% unused code - commented out to keep dialyzer happy -%% disc_only -> +%% Type =:= disc_only -> %% if %% HasDiscCopies or HasDiscOnlyCopies -> %% disc_only_copies; %% true -> ram_copies %% end; - ram -> + Type =:= ram -> ram_copies end, ok = create_local_table_copy(Tab, StorageType) end, table_definitions()), - ok = if Type == ram -> create_local_table_copy(schema, ram_copies); - true -> ok - end, ok. create_local_table_copy(Tab, Type) -> @@ -384,10 +367,14 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_tables() -> +wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). + +wait_for_tables() -> wait_for_tables(table_names()). + +wait_for_tables(TableNames) -> case check_schema_integrity() of ok -> - case mnesia:wait_for_tables(table_names(), 30000) of + case mnesia:wait_for_tables(TableNames, 30000) of ok -> ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl new file mode 100644 index 00000000..a5ccc8e9 --- /dev/null +++ b/src/rabbit_net.erl @@ -0,0 +1,132 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_net). +-include("rabbit.hrl"). +-include_lib("kernel/include/inet.hrl"). + +-export([async_recv/3, close/1, controlling_process/2, + getstat/2, peername/1, port_command/2, + send/2, sockname/1]). +%%--------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(stat_option() :: + 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | + 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). +-type(error() :: {'error', any()}). + +-spec(async_recv/3 :: (socket(), integer(), timeout()) -> {'ok', any()}). +-spec(close/1 :: (socket()) -> 'ok' | error()). +-spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()). +-spec(port_command/2 :: (socket(), iolist()) -> 'true'). +-spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()). +-spec(peername/1 :: (socket()) -> + {'ok', {ip_address(), non_neg_integer()}} | error()). +-spec(sockname/1 :: (socket()) -> + {'ok', {ip_address(), non_neg_integer()}} | error()). +-spec(getstat/2 :: (socket(), [stat_option()]) -> + {'ok', [{stat_option(), integer()}]} | error()). + +-endif. + +%%--------------------------------------------------------------------------- + + +async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> + Pid = self(), + Ref = make_ref(), + + spawn(fun() -> Pid ! {inet_async, Sock, Ref, + ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} + end), + + {ok, Ref}; + +async_recv(Sock, Length, infinity) when is_port(Sock) -> + prim_inet:async_recv(Sock, Length, -1); + +async_recv(Sock, Length, Timeout) when is_port(Sock) -> + prim_inet:async_recv(Sock, Length, Timeout). + +close(Sock) when is_record(Sock, ssl_socket) -> + ssl:close(Sock#ssl_socket.ssl); + +close(Sock) when is_port(Sock) -> + gen_tcp:close(Sock). + + +controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) -> + ssl:controlling_process(Sock#ssl_socket.ssl, Pid); + +controlling_process(Sock, Pid) when is_port(Sock) -> + gen_tcp:controlling_process(Sock, Pid). + + +getstat(Sock, Stats) when is_record(Sock, ssl_socket) -> + inet:getstat(Sock#ssl_socket.tcp, Stats); + +getstat(Sock, Stats) when is_port(Sock) -> + inet:getstat(Sock, Stats). + + +peername(Sock) when is_record(Sock, ssl_socket) -> + ssl:peername(Sock#ssl_socket.ssl); + +peername(Sock) when is_port(Sock) -> + inet:peername(Sock). + + +port_command(Sock, Data) when is_record(Sock, ssl_socket) -> + case ssl:send(Sock#ssl_socket.ssl, Data) of + ok -> + self() ! {inet_reply, Sock, ok}, + true; + {error, Reason} -> + erlang:error(Reason) + end; + +port_command(Sock, Data) when is_port(Sock) -> + erlang:port_command(Sock, Data). + +send(Sock, Data) when is_record(Sock, ssl_socket) -> + ssl:send(Sock#ssl_socket.ssl, Data); + +send(Sock, Data) when is_port(Sock) -> + gen_tcp:send(Sock, Data). + + +sockname(Sock) when is_record(Sock, ssl_socket) -> + ssl:sockname(Sock#ssl_socket.ssl); + +sockname(Sock) when is_port(Sock) -> + inet:sockname(Sock). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 2dbd5a5a..eed21a01 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -31,18 +31,28 @@ -module(rabbit_networking). --export([start/0, start_tcp_listener/2, stop_tcp_listener/2, - on_node_down/1, active_listeners/0, node_listeners/1, - connections/0, connection_info/1, connection_info/2, - connection_info_all/0, connection_info_all/1]). +-export([start/0, start_tcp_listener/2, start_ssl_listener/3, + stop_tcp_listener/2, on_node_down/1, active_listeners/0, + node_listeners/1, connections/0, connection_info/1, + connection_info/2, connection_info_all/0, + connection_info_all/1]). %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). --export([tcp_listener_started/2, tcp_listener_stopped/2, start_client/1]). +-export([tcp_listener_started/2, ssl_connection_upgrade/2, + tcp_listener_stopped/2, start_client/1]). -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). +-define(RABBIT_TCP_OPTS, [ + binary, + {packet, raw}, % no packaging + {reuseaddr, true}, % allow rebind without waiting + %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. + %% {delay_send, true}, + {exit_on_close, false} + ]). %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -52,6 +62,7 @@ -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). +-spec(start_ssl_listener/3 :: (host(), ip_port(), [info()]) -> 'ok'). -spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). -spec(active_listeners/0 :: () -> [listener()]). -spec(node_listeners/1 :: (erlang_node()) -> [listener()]). @@ -96,21 +107,24 @@ check_tcp_listener_address(NamePrefix, Host, Port) -> {IPAddress, Name}. start_tcp_listener(Host, Port) -> - {IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), + start_listener(Host, Port, "TCP Listener", + {?MODULE, start_client, []}). + +start_ssl_listener(Host, Port, SslOpts) -> + start_listener(Host, Port, "SSL Listener", + {?MODULE, ssl_connection_upgrade, [SslOpts]}). + +start_listener(Host, Port, Label, OnConnect) -> + {IPAddress, Name} = + check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), {ok,_} = supervisor:start_child( rabbit_sup, {Name, {tcp_listener_sup, start_link, - [IPAddress, Port, - [binary, - {packet, raw}, % no packaging - {reuseaddr, true}, % allow rebind without waiting - %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. - %% {delay_send, true}, - {exit_on_close, false}], + [IPAddress, Port, ?RABBIT_TCP_OPTS , {?MODULE, tcp_listener_started, []}, {?MODULE, tcp_listener_stopped, []}, - {?MODULE, start_client, []}]}, + OnConnect, Label]}, transient, infinity, supervisor, [tcp_listener_sup]}), ok. @@ -148,10 +162,27 @@ on_node_down(Node) -> start_client(Sock) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - ok = gen_tcp:controlling_process(Sock, Child), + ok = rabbit_net:controlling_process(Sock, Child), Child ! {go, Sock}, Child. +ssl_connection_upgrade(SslOpts, Sock) -> + {ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock), + PeerIp = inet_parse:ntoa(PeerAddress), + + case ssl:ssl_accept(Sock, SslOpts) of + {ok, SslSock} -> + rabbit_log:info("upgraded TCP connection from ~s:~p to SSL~n", + [PeerIp, PeerPort]), + RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, + start_client(RabbitSslSock); + {error, Reason} -> + gen_tcp:close(Sock), + rabbit_log:error("failed to upgrade TCP connection from ~s:~p " + "to SSL: ~n~p~n", [PeerIp, PeerPort, Reason]), + {error, Reason} + end. + connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( rabbit_tcp_client_sup)]. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 426b99eb..69dbc008 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -200,7 +200,7 @@ inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). peername(Sock) -> try - {Address, Port} = inet_op(fun () -> inet:peername(Sock) end), + {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end), AddressS = inet_parse:ntoa(Address), {AddressS, Port} catch @@ -286,7 +286,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> + {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); {channel_exit, Channel, Reason} -> mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); @@ -323,8 +323,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end. switch_callback(OldState, NewCallback, Length) -> - Ref = inet_op(fun () -> prim_inet:async_recv( - OldState#v1.sock, Length, -1) end), + Ref = inet_op(fun () -> rabbit_net:async_recv( + OldState#v1.sock, Length, infinity) end), OldState#v1{callback = NewCallback, recv_ref = Ref}. @@ -539,7 +539,7 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, end; handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = inet_op(fun () -> gen_tcp:send( + ok = inet_op(fun () -> rabbit_net:send( Sock, <<"AMQP",1,1, ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR>>) end), @@ -675,23 +675,23 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, #v1{}) -> self(); i(address, #v1{sock = Sock}) -> - {ok, {A, _}} = inet:sockname(Sock), + {ok, {A, _}} = rabbit_net:sockname(Sock), A; i(port, #v1{sock = Sock}) -> - {ok, {_, P}} = inet:sockname(Sock), + {ok, {_, P}} = rabbit_net:sockname(Sock), P; i(peer_address, #v1{sock = Sock}) -> - {ok, {A, _}} = inet:peername(Sock), + {ok, {A, _}} = rabbit_net:peername(Sock), A; i(peer_port, #v1{sock = Sock}) -> - {ok, {_, P}} = inet:peername(Sock), + {ok, {_, P}} = rabbit_net:peername(Sock), P; i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; SockStat =:= recv_cnt; SockStat =:= send_oct; SockStat =:= send_cnt; SockStat =:= send_pend -> - case inet:getstat(Sock, [SockStat]) of + case rabbit_net:getstat(Sock, [SockStat]) of {ok, [{SockStat, StatVal}]} -> StatVal; {error, einval} -> undefined; {error, Error} -> throw({cannot_get_socket_stats, Error}) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e5100ccd..b4cd30bc 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_priority_queue(), + passed = test_unfold(), passed = test_parsing(), passed = test_topic_matching(), passed = test_log_management(), @@ -75,7 +76,8 @@ test_priority_queue() -> %% 1-element priority Q Q1 = priority_queue:in(foo, 1, priority_queue:new()), - {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), + {true, false, 1, [{1, foo}], [foo]} = + test_priority_queue(Q1), %% 2-element same-priority Q Q2 = priority_queue:in(bar, 1, Q1), @@ -91,6 +93,71 @@ test_priority_queue() -> Q4 = priority_queue:in(foo, -1, priority_queue:new()), {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), + %% merge 2 * 1-element no-priority Qs + Q5 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q5), + + %% merge 1-element no-priority Q with 1-element priority Q + Q6 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} = + test_priority_queue(Q6), + + %% merge 1-element priority Q with 1-element no-priority Q + Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q7), + + %% merge 2 * 1-element same-priority Qs + Q8 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q8), + + %% merge 2 * 1-element different-priority Qs + Q9 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 2, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q9), + + %% merge 2 * 1-element different-priority Qs (other way around) + Q10 = priority_queue:join(priority_queue:in(bar, 2, Q), + priority_queue:in(foo, 1, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q10), + + %% merge 2 * 2-element multi-different-priority Qs + Q11 = priority_queue:join(Q6, Q5), + {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}], + [bar, foo, foo, bar]} = test_priority_queue(Q11), + + %% and the other way around + Q12 = priority_queue:join(Q5, Q6), + {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}], + [bar, foo, bar, foo]} = test_priority_queue(Q12), + + %% merge with negative priorities + Q13 = priority_queue:join(Q4, Q5), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q13), + + %% and the other way around + Q14 = priority_queue:join(Q5, Q4), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q14), + + %% joins with empty queues: + Q1 = priority_queue:join(Q, Q1), + Q1 = priority_queue:join(Q1, Q), + + %% insert with priority into non-empty zero-priority queue + Q15 = priority_queue:in(baz, 1, Q5), + {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} = + test_priority_queue(Q15), + passed. priority_queue_in_all(Q, L) -> @@ -116,6 +183,14 @@ test_simple_n_element_queue(N) -> {true, false, N, ToListRes, Items} = test_priority_queue(Q), passed. +test_unfold() -> + {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), + List = lists:seq(2,20,2), + {List, 0} = rabbit_misc:unfold(fun (0) -> false; + (N) -> {true, N*2, N-1} + end, 10), + passed. + test_parsing() -> passed = test_content_properties(), passed. @@ -408,19 +483,17 @@ test_cluster_management() -> end, ClusteringSequence), - %% attempt to convert a disk node into a ram node + %% convert a disk node into a ram node ok = control_action(reset, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), - %% attempt to join a non-existing cluster as a ram node + %% join a non-existing cluster as a ram node ok = control_action(reset, []), - {error, {unable_to_contact_cluster_nodes, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), SecondaryNode = rabbit_misc:localnode(hare), case net_adm:ping(SecondaryNode) of @@ -436,11 +509,12 @@ test_cluster_management2(SecondaryNode) -> NodeS = atom_to_list(node()), SecondaryNodeS = atom_to_list(SecondaryNode), - %% attempt to convert a disk node into a ram node + %% make a disk node ok = control_action(reset, []), ok = control_action(cluster, [NodeS]), - {error, {unable_to_join_cluster, _, _}} = - control_action(cluster, [SecondaryNodeS]), + %% make a ram node + ok = control_action(reset, []), + ok = control_action(cluster, [SecondaryNodeS]), %% join cluster as a ram node ok = control_action(reset, []), @@ -453,21 +527,21 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to join non-existing cluster as a ram node - {error, _} = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), - + %% join non-existing cluster as a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn ram node into disk node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to convert a disk node into a ram node - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + %% convert a disk node into a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn a disk node into a ram node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index e338ddfe..1679ce7c 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -169,7 +169,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) -> tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, - fun () -> gen_tcp:send(Sock, Data) end). + fun () -> rabbit_net:send(Sock, Data) end). internal_send_command(Sock, Channel, MethodRecord) -> ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). @@ -206,6 +206,6 @@ internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> ok. port_cmd(Sock, Data) -> - try erlang:port_command(Sock, Data) + try rabbit_net:port_command(Sock, Data) catch error:Error -> exit({writer, send_failed, Error}) end. diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 92a47cf1..4a2e149b 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -33,28 +33,28 @@ -behaviour(gen_server). --export([start_link/7]). +-export([start_link/8]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {sock, on_startup, on_shutdown}). +-record(state, {sock, on_startup, on_shutdown, label}). %%-------------------------------------------------------------------- start_link(IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - OnStartup, OnShutdown) -> + OnStartup, OnShutdown, Label) -> gen_server:start_link( ?MODULE, {IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - OnStartup, OnShutdown}, []). + OnStartup, OnShutdown, Label}, []). %%-------------------------------------------------------------------- init({IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - {M,F,A} = OnStartup, OnShutdown}) -> + {M,F,A} = OnStartup, OnShutdown, Label}) -> process_flag(trap_exit, true), case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress}, {active, false}]) of @@ -65,15 +65,16 @@ init({IPAddress, Port, SocketOpts, end, lists:duplicate(ConcurrentAcceptorCount, dummy)), {ok, {LIPAddress, LPort}} = inet:sockname(LSock), - error_logger:info_msg("started TCP listener on ~s:~p~n", - [inet_parse:ntoa(LIPAddress), LPort]), + error_logger:info_msg("started ~s on ~s:~p~n", + [Label, inet_parse:ntoa(LIPAddress), LPort]), apply(M, F, A ++ [IPAddress, Port]), - {ok, #state{sock=LSock, - on_startup = OnStartup, on_shutdown = OnShutdown}}; + {ok, #state{sock = LSock, + on_startup = OnStartup, on_shutdown = OnShutdown, + label = Label}}; {error, Reason} -> error_logger:error_msg( - "failed to start TCP listener on ~s:~p - ~p~n", - [inet_parse:ntoa(IPAddress), Port, Reason]), + "failed to start ~s on ~s:~p - ~p~n", + [Label, inet_parse:ntoa(IPAddress), Port, Reason]), {stop, {cannot_listen, IPAddress, Port, Reason}} end. @@ -86,11 +87,11 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}}) -> +terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) -> {ok, {IPAddress, Port}} = inet:sockname(LSock), gen_tcp:close(LSock), - error_logger:info_msg("stopped TCP listener on ~s:~p~n", - [inet_parse:ntoa(IPAddress), Port]), + error_logger:info_msg("stopped ~s on ~s:~p~n", + [Label, inet_parse:ntoa(IPAddress), Port]), apply(M, F, A ++ [IPAddress, Port]). code_change(_OldVsn, State, _Extra) -> diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 901a0da3..d6bbac08 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -33,23 +33,23 @@ -behaviour(supervisor). --export([start_link/6, start_link/7]). +-export([start_link/7, start_link/8]). -export([init/1]). start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback) -> + AcceptCallback, Label) -> start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, 1). + AcceptCallback, 1, Label). start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount) -> + AcceptCallback, ConcurrentAcceptorCount, Label) -> supervisor:start_link( ?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount}). + AcceptCallback, ConcurrentAcceptorCount, Label}). init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount}) -> + AcceptCallback, ConcurrentAcceptorCount, Label}) -> %% This is gross. The tcp_listener needs to know about the %% tcp_acceptor_sup, and the only way I can think of accomplishing %% that without jumping through hoops is to register the @@ -62,5 +62,5 @@ init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, {tcp_listener, {tcp_listener, start_link, [IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, Name, - OnStartup, OnShutdown]}, + OnStartup, OnShutdown, Label]}, transient, 100, worker, [tcp_listener]}]}}. |