summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-08-02 10:42:55 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-08-02 10:42:55 +0100
commit34db93f494f6b2d91d077001352b0d7ce5b68297 (patch)
treea2005547398a57dffc410f872495e00d256c321c
parentfb7a0b5c07e38bfb7ff0e7a00306fed145f0fb05 (diff)
parentc0784b916ae1c911bc9090abfb2569027cb7d6e1 (diff)
downloadrabbitmq-server-34db93f494f6b2d91d077001352b0d7ce5b68297.tar.gz
merge heads
-rw-r--r--.hgignore2
-rw-r--r--Makefile18
-rw-r--r--codegen.py20
-rw-r--r--docs/rabbitmqctl.1.xml4
-rw-r--r--include/rabbit.hrl5
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_basic.erl12
-rw-r--r--src/rabbit_binary_generator.erl61
-rw-r--r--src/rabbit_binary_parser.erl13
-rw-r--r--src/rabbit_channel.erl114
-rw-r--r--src/rabbit_exchange.erl1
-rw-r--r--src/rabbit_framing_channel.erl78
-rw-r--r--src/rabbit_limiter.erl5
-rw-r--r--src/rabbit_reader.erl275
-rw-r--r--src/rabbit_tests.erl18
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_writer.erl100
19 files changed, 379 insertions, 378 deletions
diff --git a/.hgignore b/.hgignore
index 7b796b66..03b60914 100644
--- a/.hgignore
+++ b/.hgignore
@@ -11,7 +11,7 @@ syntax: regexp
^dist/
^include/rabbit_framing\.hrl$
^include/rabbit_framing_spec\.hrl$
-^src/rabbit_framing\.erl$
+^src/rabbit_framing_amqp.*\.erl$
^src/.*\_usage.erl$
^rabbit\.plt$
^basic.plt$
diff --git a/Makefile b/Makefile
index 5694292d..a97838cc 100644
--- a/Makefile
+++ b/Makefile
@@ -12,7 +12,7 @@ EBIN_DIR=ebin
INCLUDE_DIR=include
DOCS_DIR=docs
INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl
-SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL)
+SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl $(USAGES_ERL)
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS)
WEB_URL=http://www.rabbitmq.com/
@@ -56,7 +56,8 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json $(AMQP_CODEGEN_DIR)/rabbitmq-0.8-extensions.json
+AMQP_SPEC_JSON_FILES_0_9_1=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.9.1.json
+AMQP_SPEC_JSON_FILES_0_8=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.8.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
@@ -99,11 +100,14 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/%.beam:
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
-$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
- $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@
+$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8)
+ $(PYTHON) codegen.py --ignore-conflicts header $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) $@
-$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
- $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@
+$(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1)
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_9_1) $@
+
+$(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_8)
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_8) $@
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
$(ERL_EBIN) -eval \
@@ -128,7 +132,7 @@ $(BASIC_PLT): $(BEAM_TARGETS)
clean:
rm -f $(EBIN_DIR)/*.beam
rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel
- rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
+ rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing_amqp_*.erl codegen.pyc
rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL)
rm -f $(RABBIT_PLT)
rm -f $(DEPS_FILE)
diff --git a/codegen.py b/codegen.py
index 1244aae1..230d785e 100644
--- a/codegen.py
+++ b/codegen.py
@@ -315,11 +315,16 @@ def genErl(spec):
methods = spec.allMethods()
printFileHeader()
- print """-module(rabbit_framing).
--include("rabbit_framing.hrl").
-
+ module = "rabbit_framing_amqp_%d_%d" % (spec.major, spec.minor)
+ if spec.revision != 0:
+ module = "%s_%d" % (module, spec.revision)
+ if module == "rabbit_framing_amqp_8_0":
+ module = "rabbit_framing_amqp_0_8"
+ print "-module(%s)." % module
+ print """-include("rabbit_framing.hrl").
+
+-export([version/0]).
-export([lookup_method_name/1]).
-
-export([method_id/1]).
-export([method_has_content/1]).
-export([is_method_synchronous/1]).
@@ -395,6 +400,7 @@ def genErl(spec):
print """
%% Method signatures
-ifdef(use_specs).
+-spec(version/0 :: () -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()).
-spec(method_id/1 :: (amqp_method_name()) -> amqp_method()).
-spec(method_has_content/1 :: (amqp_method_name()) -> boolean()).
@@ -413,6 +419,10 @@ bitvalue(true) -> 1;
bitvalue(false) -> 0;
bitvalue(undefined) -> 0.
"""
+ version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision)
+ if version == '{8, 0, 0}': version = '{0, 8, 0}'
+ print "version() -> %s." % (version)
+
for m in methods: genLookupMethodName(m)
print "lookup_method_name({_ClassId, _MethodId} = Id) -> exit({unknown_method_id, Id})."
@@ -471,8 +481,6 @@ def genHrl(spec):
methods = spec.allMethods()
printFileHeader()
- print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major)
- print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor)
print "-define(PROTOCOL_PORT, %d)." % (spec.port)
for (c,v,cls) in spec.constants:
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index e53a97c2..a7d064f1 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -888,6 +888,10 @@
<listitem><para>Number of channels using the connection.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>protocol</term>
+ <listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>user</term>
<listitem><para>Username associated with the connection.</para></listitem>
</varlistentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 3fd52568..6364d60f 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -36,7 +36,8 @@
-record(vhost, {virtual_host, dummy}).
--record(connection, {user, timeout_sec, frame_max, vhost, client_properties}).
+-record(connection, {protocol, user, timeout_sec, frame_max, vhost,
+ client_properties}).
-record(content,
{class_id,
@@ -44,6 +45,7 @@
properties_bin, %% either 'none', or an encoded properties binary
%% Note: at most one of properties and properties_bin can be
%% 'none' at once.
+ protocol, %% The protocol under which properties_bin was encoded
payload_fragments_rev %% list of binaries, in reverse order (!)
}).
@@ -74,6 +76,7 @@
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
+-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
-define(ERTS_MINIMUM, "5.6.3").
-define(MAX_WAIT, 16#ffffffff).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 18045b94..ada2c38e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -426,9 +426,9 @@ print_banner() ->
"| ~s +---+ |~n"
"| |~n"
"+-------------------+~n"
- "AMQP ~p-~p~n~s~n~s~n~n",
+ "~s~n~s~n~s~n~n",
[Product, string:right([$v|Version], ProductLen),
- ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
+ ?PROTOCOL_VERSION,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
Settings = [{"node", node()},
{"app descriptor", app_location()},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6bf2f6db..870c119a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -39,7 +39,7 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/4]).
+ stat/1, deliver/2, requeue/3, ack/4, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
@@ -124,6 +124,7 @@
-spec(ack/4 ::
(pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid())
-> 'ok').
+-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()).
-spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
@@ -367,6 +368,9 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+reject(QPid, MsgIds, Requeue, ChPid) ->
+ delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}).
+
commit_all(QPids, Txn, ChPid) ->
safe_delegate_call_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 67f0fcf5..ac5fb7f9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -783,6 +783,21 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State#q{backing_queue_state = BQS1})
end;
+handle_cast({reject, AckTags, Requeue, ChPid},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ noreply(State);
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ noreply(case Requeue of
+ true -> requeue_and_run(AckTags, State);
+ false -> BQS1 = BQ:ack(AckTags, BQS),
+ State #q { backing_queue_state = BQS1 }
+ end)
+ end;
+
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 03a19961..c76c01ac 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -97,18 +97,24 @@ delivery(Mandatory, Immediate, Txn, Message) ->
sender = self(), message = Message}.
build_content(Properties, BodyBin) ->
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
#content{class_id = ClassId,
properties = Properties,
properties_bin = none,
+ protocol = none,
payload_fragments_rev = [BodyBin]}.
from_content(Content) ->
#content{class_id = ClassId,
properties = Props,
payload_fragments_rev = FragmentsRev} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ rabbit_binary_parser:ensure_content_decoded(Content,
+ rabbit_framing_amqp_0_9_1),
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 0e6ebe57..f0ec6180 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -41,12 +41,12 @@
% See definition of check_empty_content_body_frame_size/0, an assertion called at startup.
-define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8).
--export([build_simple_method_frame/2,
- build_simple_content_frames/3,
+-export([build_simple_method_frame/3,
+ build_simple_content_frames/4,
build_heartbeat_frame/0]).
-export([generate_table/1, encode_properties/2]).
-export([check_empty_content_body_frame_size/0]).
--export([ensure_content_encoded/1, clear_encoded_content/1]).
+-export([ensure_content_encoded/2, clear_encoded_content/1]).
-import(lists).
@@ -56,20 +56,22 @@
-type(frame() :: [binary()]).
--spec(build_simple_method_frame/2 ::
- (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record())
+-spec(build_simple_method_frame/3 ::
+ (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(),
+ rabbit_types:protocol())
-> frame()).
--spec(build_simple_content_frames/3 ::
+-spec(build_simple_content_frames/4 ::
(rabbit_channel:channel_number(), rabbit_types:content(),
- non_neg_integer())
+ non_neg_integer(), rabbit_types:protocol())
-> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
-spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()).
-spec(encode_properties/2 ::
([rabbit_framing:amqp_property_type()], [any()]) -> binary()).
-spec(check_empty_content_body_frame_size/0 :: () -> 'ok').
--spec(ensure_content_encoded/1 ::
- (rabbit_types:content()) -> rabbit_types:encoded_content()).
+-spec(ensure_content_encoded/2 ::
+ (rabbit_types:content(), rabbit_types:protocol()) ->
+ rabbit_types:encoded_content()).
-spec(clear_encoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:unencoded_content()).
@@ -77,30 +79,24 @@
%%----------------------------------------------------------------------------
-build_simple_method_frame(ChannelInt, MethodRecord) ->
- MethodFields = rabbit_framing:encode_method_fields(MethodRecord),
+build_simple_method_frame(ChannelInt, MethodRecord, Protocol) ->
+ MethodFields = Protocol:encode_method_fields(MethodRecord),
MethodName = rabbit_misc:method_record_type(MethodRecord),
- {ClassId, MethodId} = rabbit_framing:method_id(MethodName),
+ {ClassId, MethodId} = Protocol:method_id(MethodName),
create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]).
-build_simple_content_frames(ChannelInt,
- #content{class_id = ClassId,
- properties = ContentProperties,
- properties_bin = ContentPropertiesBin,
- payload_fragments_rev = PayloadFragmentsRev},
- FrameMax) ->
- {BodySize, ContentFrames} = build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt),
+build_simple_content_frames(ChannelInt, Content, FrameMax, Protocol) ->
+ #content{class_id = ClassId,
+ properties_bin = ContentPropertiesBin,
+ payload_fragments_rev = PayloadFragmentsRev} =
+ ensure_content_encoded(Content, Protocol),
+ {BodySize, ContentFrames} =
+ build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt),
HeaderFrame = create_frame(2, ChannelInt,
[<<ClassId:16, 0:16, BodySize:64>>,
- maybe_encode_properties(ContentProperties, ContentPropertiesBin)]),
+ ContentPropertiesBin]),
[HeaderFrame | ContentFrames].
-maybe_encode_properties(_ContentProperties, ContentPropertiesBin)
- when is_binary(ContentPropertiesBin) ->
- ContentPropertiesBin;
-maybe_encode_properties(ContentProperties, none) ->
- rabbit_framing:encode_properties(ContentProperties).
-
build_content_frames(FragsRev, FrameMax, ChannelInt) ->
BodyPayloadMax = if FrameMax == 0 ->
iolist_size(FragsRev);
@@ -283,13 +279,16 @@ check_empty_content_body_frame_size() ->
ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE})
end.
-ensure_content_encoded(Content = #content{properties_bin = PropsBin})
+ensure_content_encoded(Content = #content{properties_bin = PropsBin,
+ protocol = Protocol}, Protocol)
when PropsBin =/= 'none' ->
Content;
-ensure_content_encoded(Content = #content{properties = Props}) ->
- Content #content{properties_bin = rabbit_framing:encode_properties(Props)}.
+ensure_content_encoded(Content = #content{properties = Props}, Protocol) ->
+ Content#content{properties_bin = Protocol:encode_properties(Props),
+ protocol = Protocol}.
-clear_encoded_content(Content = #content{properties_bin = none}) ->
+clear_encoded_content(Content = #content{properties_bin = none,
+ protocol = none}) ->
Content;
clear_encoded_content(Content = #content{properties = none}) ->
%% Only clear when we can rebuild the properties_bin later in
@@ -297,4 +296,4 @@ clear_encoded_content(Content = #content{properties = none}) ->
%% one of properties and properties_bin can be 'none'
Content;
clear_encoded_content(Content = #content{}) ->
- Content#content{properties_bin = none}.
+ Content#content{properties_bin = none, protocol = none}.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 69e34440..1d0a62af 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -34,7 +34,7 @@
-include("rabbit.hrl").
-export([parse_table/1, parse_properties/2]).
--export([ensure_content_decoded/1, clear_decoded_content/1]).
+-export([ensure_content_decoded/2, clear_decoded_content/1]).
-import(lists).
@@ -45,8 +45,9 @@
-spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()).
-spec(parse_properties/2 ::
([rabbit_framing:amqp_property_type()], binary()) -> [any()]).
--spec(ensure_content_decoded/1 ::
- (rabbit_types:content()) -> rabbit_types:decoded_content()).
+-spec(ensure_content_decoded/2 ::
+ (rabbit_types:content(), rabbit_types:protocol())
+ -> rabbit_types:decoded_content()).
-spec(clear_decoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:undecoded_content()).
@@ -162,12 +163,12 @@ parse_property(bit, Rest) ->
parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) ->
{parse_table(Table), Rest}.
-ensure_content_decoded(Content = #content{properties = Props})
+ensure_content_decoded(Content = #content{properties = Props}, _Protocol)
when Props =/= 'none' ->
Content;
-ensure_content_decoded(Content = #content{properties_bin = PropBin})
+ensure_content_decoded(Content = #content{properties_bin = PropBin}, Protocol)
when is_binary(PropBin) ->
- Content#content{properties = rabbit_framing:decode_properties(
+ Content#content{properties = Protocol:decode_properties(
Content#content.class_id, PropBin)}.
clear_decoded_content(Content = #content{properties = none}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dafc3075..1928e21d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,11 +36,9 @@
-behaviour(gen_server2).
-export([start_link/6, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
+-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([flow_timeout/2]).
-
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
@@ -48,12 +46,9 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, flow}).
-
--record(flow, {server, client, pending}).
+ consumer_mapping, blocking, queue_collector_pid}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
--define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
-define(INFO_KEYS,
[pid,
@@ -78,7 +73,7 @@
-spec(start_link/6 ::
(channel_number(), pid(), pid(), rabbit_access_control:username(),
- rabbit_types:vhost(), pid()) -> pid()).
+ rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -87,9 +82,7 @@
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
--spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(flow_timeout/2 :: (pid(), ref()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
@@ -102,10 +95,8 @@
%%----------------------------------------------------------------------------
start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
- {ok, Pid} = gen_server2:start_link(
- ?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost, CollectorPid], []),
- Pid.
+ gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid,
+ Username, VHost, CollectorPid], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -122,15 +113,9 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
-conserve_memory(Pid, Conserve) ->
- gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}).
-
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-flow_timeout(Pid, Ref) ->
- gen_server2:pcast(Pid, 7, {flow_timeout, Ref}).
-
list() ->
pg_local:get_members(rabbit_channels).
@@ -172,9 +157,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
- queue_collector_pid = CollectorPid,
- flow = #flow{server = true, client = true,
- pending = none}},
+ queue_collector_pid = CollectorPid},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -225,27 +208,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
- noreply(State1#ch{next_tag = DeliveryTag + 1});
-
-handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
- noreply(State);
-handle_cast({conserve_memory, false}, State = #ch{state = starting}) ->
- ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}),
- noreply(State#ch{state = running});
-handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) ->
- flow_control(not Conserve, State);
-handle_cast({conserve_memory, _Conserve}, State) ->
- noreply(State);
-
-handle_cast({flow_timeout, Ref},
- State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
- {stop, normal, terminating(
- rabbit_misc:amqp_error(
- precondition_failed,
- "timeout waiting for channel.flow_ok{active=~w}",
- [not Flow], none), State)};
-handle_cast({flow_timeout, _Ref}, State) ->
- {noreply, State}.
+ noreply(State1#ch{next_tag = DeliveryTag + 1}).
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -385,10 +348,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of
- true -> {noreply, State};
- false -> {reply, #'channel.open_ok'{}, State#ch{state = running}}
- end;
+ {reply, #'channel.open_ok'{}, State#ch{state = running}};
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -405,10 +365,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) ->
- rabbit_misc:protocol_error(
- command_invalid,
- "basic.publish received after channel.flow_ok{active=false}", []);
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -421,7 +377,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
+ DecodedContent = rabbit_binary_parser:ensure_content_decoded(
+ Content, rabbit_framing_amqp_0_9_1),
IsPersistent = is_message_persistent(DecodedContent),
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -638,6 +595,17 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
{noreply, State2};
+handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
+ requeue = Requeue},
+ _, State = #ch{ unacked_message_q = UAMQ}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
+ ok = fold_per_queue(
+ fun (QPid, MsgIds, ok) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, ok, Acked),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ {noreply, State#ch{unacked_message_q = Remaining}};
+
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,
@@ -853,48 +821,12 @@ handle_method(#'channel.flow'{active = false}, _,
blocking = dict:from_list(Queues)}}
end;
-handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{flow = #flow{server = Active, client = Flow,
- pending = {_Ref, TRef}} = F})
- when Flow =:= not Active ->
- {ok, cancel} = timer:cancel(TRef),
- {noreply, State#ch{flow = F#flow{client = Active, pending = none}}};
-handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{flow = #flow{server = Flow, client = Flow,
- pending = {_Ref, TRef}}})
- when Flow =:= not Active ->
- {ok, cancel} = timer:cancel(TRef),
- {noreply, issue_flow(Flow, State)};
-handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) ->
- rabbit_misc:protocol_error(
- command_invalid, "unsolicited channel.flow_ok", []);
-handle_method(#'channel.flow_ok'{active = Active}, _, _State) ->
- rabbit_misc:protocol_error(
- command_invalid,
- "received channel.flow_ok{active=~w} has incorrect polarity", [Active]);
-
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
%%----------------------------------------------------------------------------
-flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}})
- when Flow =:= not Active ->
- ok = clear_permission_cache(),
- noreply(issue_flow(Active, State));
-flow_control(Active, State = #ch{flow = F}) ->
- noreply(State#ch{flow = F#flow{server = Active}}).
-
-issue_flow(Active, State) ->
- ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = Active}),
- Ref = make_ref(),
- {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
- [self(), Ref]),
- State#ch{flow = #flow{server = Active, client = not Active,
- pending = {Ref, TRef}}}.
-
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -938,7 +870,7 @@ basic_return(#basic_message{exchange_name = ExchangeName,
content = Content},
WriterPid, Reason) ->
{_Close, ReplyCode, ReplyText} =
- rabbit_framing:lookup_amqp_exception(Reason),
+ rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.return'{reply_code = ReplyCode,
@@ -1031,7 +963,7 @@ fold_per_queue(F, Acc0, UAQ) ->
Acc0, D).
start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
- LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+ {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
ok = limit_queues(LPid, State),
LPid.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 7f7622b2..49f87a22 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -378,7 +378,6 @@ cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
[X] = mnesia:read({rabbit_exchange, ExchangeName}),
{maybe_auto_delete(X), Bindings}.
-
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index bc1a2a08..00b74ad0 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -32,21 +32,22 @@
-module(rabbit_framing_channel).
-include("rabbit.hrl").
--export([start_link/2, process/2, shutdown/1]).
+-export([start_link/3, process/2, shutdown/1]).
%% internal
--export([mainloop/1]).
+-export([mainloop/2]).
%%--------------------------------------------------------------------
-start_link(StartFun, StartArgs) ->
- spawn_link(
- fun () ->
- %% we trap exits so that a normal termination of the
- %% channel or reader process terminates us too.
- process_flag(trap_exit, true),
- mainloop(apply(StartFun, StartArgs))
- end).
+start_link(StartFun, StartArgs, Protocol) ->
+ {ok, spawn_link(
+ fun () ->
+ %% we trap exits so that a normal termination of
+ %% the channel or reader process terminates us too.
+ process_flag(trap_exit, true),
+ {ok, ChannelPid} = apply(StartFun, StartArgs),
+ mainloop(ChannelPid, Protocol)
+ end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
@@ -72,37 +73,42 @@ read_frame(ChannelPid) ->
Msg -> exit({unexpected_message, Msg})
end.
-mainloop(ChannelPid) ->
- {method, MethodName, FieldsBin} = read_frame(ChannelPid),
- Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin),
- case rabbit_framing:method_has_content(MethodName) of
- true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
- rabbit_channel:do(ChannelPid, Method,
- collect_content(ChannelPid, ClassId));
- false -> rabbit_channel:do(ChannelPid, Method)
- end,
- ?MODULE:mainloop(ChannelPid).
+mainloop(ChannelPid, Protocol) ->
+ case read_frame(ChannelPid) of
+ {method, MethodName, FieldsBin} ->
+ Method = Protocol:decode_method_fields(MethodName, FieldsBin),
+ case Protocol:method_has_content(MethodName) of
+ true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
+ rabbit_channel:do(ChannelPid, Method,
+ collect_content(ChannelPid,
+ ClassId,
+ Protocol));
+ false -> rabbit_channel:do(ChannelPid, Method)
+ end,
+ ?MODULE:mainloop(ChannelPid, Protocol);
+ _ ->
+ unexpected_frame("expected method frame, "
+ "got non method frame instead",
+ [])
+ end.
-collect_content(ChannelPid, ClassId) ->
+collect_content(ChannelPid, ClassId, Protocol) ->
case read_frame(ChannelPid) of
{content_header, ClassId, 0, BodySize, PropertiesBin} ->
Payload = collect_content_payload(ChannelPid, BodySize, []),
#content{class_id = ClassId,
properties = none,
properties_bin = PropertiesBin,
+ protocol = Protocol,
payload_fragments_rev = Payload};
{content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId]);
+ unexpected_frame("expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId]);
_ ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content header for class ~w, "
- "got non content header frame instead",
- [ClassId])
+ unexpected_frame("expected content header for class ~w, "
+ "got non content header frame instead",
+ [ClassId])
end.
collect_content_payload(_ChannelPid, 0, Acc) ->
@@ -114,8 +120,10 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
RemainingByteCount - size(FragmentBin),
[FragmentBin | Acc]);
_ ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content body, got non content body frame instead",
- [])
+ unexpected_frame("expected content body, "
+ "got non content body frame instead",
+ [])
end.
+
+unexpected_frame(ExplanationFormat, Params) ->
+ rabbit_misc:protocol_error(unexpected_frame, ExplanationFormat, Params).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 878af029..813ccc75 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -45,7 +45,7 @@
-type(maybe_pid() :: pid() | 'undefined').
--spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()).
+-spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok(pid())).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
@@ -74,8 +74,7 @@
%%----------------------------------------------------------------------------
start_link(ChPid, UnackedMsgCount) ->
- {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []),
- Pid.
+ gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []).
shutdown(undefined) ->
ok;
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b5514c82..46171aec 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -39,9 +39,9 @@
-export([init/1, mainloop/3]).
--export([server_properties/0]).
+-export([conserve_memory/2, server_properties/0]).
--export([analyze_frame/2]).
+-export([analyze_frame/3]).
-import(gen_tcp).
-import(fprof).
@@ -58,12 +58,12 @@
%---------------------------------------------------------------------------
-record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector}).
+ queue_collector, conserving_memory}).
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
- recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
- state, channels, user, vhost, timeout, frame_max, client_properties]).
+ recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels,
+ protocol, user, vhost, timeout, frame_max, client_properties]).
%% connection lifecycle
%%
@@ -142,6 +142,7 @@
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
-endif.
@@ -208,6 +209,10 @@ teardown_profiling(Value) ->
fprof:analyse([{dest, []}, {cols, 100}])
end.
+conserve_memory(Pid, Conserve) ->
+ Pid ! {conserve_memory, Conserve},
+ ok.
+
server_properties() ->
{ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -249,7 +254,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
- client_properties = none},
+ client_properties = none,
+ protocol = none},
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init,
@@ -294,6 +300,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
+ {conserve_memory, Conserve} ->
+ mainloop(Parent, Deb, internal_conserve_memory(Conserve, State));
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -347,11 +355,14 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit({unexpected_message, Other})
end.
-switch_callback(OldState, NewCallback, Length) ->
+switch_callback(State = #v1{conserving_memory = true}, Callback, Length) ->
+ %% TODO: pause heartbeat monitor
+ %% TODO: only do this after receiving a content-bearing method
+ State#v1{callback = {Callback, Length}, recv_ref = none};
+switch_callback(State, Callback, Length) ->
Ref = inet_op(fun () -> rabbit_net:async_recv(
- OldState#v1.sock, Length, infinity) end),
- OldState#v1{callback = NewCallback,
- recv_ref = Ref}.
+ State#v1.sock, Length, infinity) end),
+ State#v1{callback = Callback, recv_ref = Ref}.
terminate(Explanation, State = #v1{connection_state = running}) ->
{normal, send_exception(State, 0,
@@ -360,6 +371,14 @@ terminate(Explanation, State = #v1{connection_state = running}) ->
terminate(_Explanation, State) ->
{force, State}.
+internal_conserve_memory(false, State = #v1{conserving_memory = true,
+ callback = {Callback, Length},
+ recv_ref = none}) ->
+ %% TODO: resume heartbeat monitor
+ switch_callback(State#v1{conserving_memory = false}, Callback, Length);
+internal_conserve_memory(Conserve, State) ->
+ State#v1{conserving_memory = Conserve}.
+
close_connection(State = #v1{connection = #connection{
timeout_sec = TimeoutSec}}) ->
%% We terminate the connection after the specified interval, but
@@ -437,7 +456,9 @@ wait_for_channel_termination(N, TimerRef) ->
end.
maybe_close(State = #v1{connection_state = closing,
- queue_collector = Collector}) ->
+ queue_collector = Collector,
+ connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
case all_channels() of
[] ->
%% Spec says "Exclusive queues may only be accessed by the current
@@ -445,16 +466,18 @@ maybe_close(State = #v1{connection_state = closing,
%% This does not strictly imply synchrony, but in practice it seems
%% to be what people assume.
rabbit_queue_collector:delete_all(Collector),
- ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
close_connection(State);
_ -> State
end;
maybe_close(State) ->
State.
-handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
+handle_frame(Type, 0, Payload,
+ State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol}})
when CS =:= closing; CS =:= closed ->
- case analyze_frame(Type, Payload) of
+ case analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
@@ -462,20 +485,20 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
when CS =:= closing; CS =:= closed ->
State;
-handle_frame(Type, 0, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, 0, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
- trace -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
end;
-handle_frame(Type, Channel, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, Channel, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
@@ -516,17 +539,20 @@ handle_frame(Type, Channel, Payload, State) ->
end
end.
-analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) ->
- {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields};
-analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) ->
+analyze_frame(?FRAME_METHOD,
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+analyze_frame(?FRAME_HEADER,
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
{content_header, ClassId, Weight, BodySize, Properties};
-analyze_frame(?FRAME_BODY, Body) ->
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
{content_body, Body};
-analyze_frame(?FRAME_TRACE, _Body) ->
- trace;
-analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
heartbeat;
-analyze_frame(_Type, _Body) ->
+analyze_frame(_Type, _Body, _Protocol) ->
error.
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
@@ -543,54 +569,61 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
throw({bad_payload, PayloadAndMarker})
end;
-handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
- State = #v1{sock = Sock, connection = Connection}) ->
- case check_version({ProtocolMajor, ProtocolMinor},
- {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
- true ->
- ok = send_on_channel0(
- Sock,
- #'connection.start'{
- version_major = ?PROTOCOL_VERSION_MAJOR,
- version_minor = ?PROTOCOL_VERSION_MINOR,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> }),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT},
- connection_state = starting},
- frame_header, 7};
- false ->
- throw({bad_version, ProtocolMajor, ProtocolMinor})
- end;
+%% The two rules pertaining to version negotiation:
+%%
+%% * If the server cannot support the protocol specified in the
+%% protocol header, it MUST respond with a valid protocol header and
+%% then close the socket connection.
+%%
+%% * The server MUST provide a protocol version that is lower than or
+%% equal to that requested by the client in the protocol header.
+handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) ->
+ start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
+
+handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) ->
+ start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
+
+%% the 0-8 spec, confusingly, defines the version as 8-0
+handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+
+handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_version, A, B, C, D});
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = inet_op(fun () -> rabbit_net:send(
- Sock, <<"AMQP",1,1,
- ?PROTOCOL_VERSION_MAJOR,
- ?PROTOCOL_VERSION_MINOR>>) end),
- throw({bad_header, Other});
+ refuse_connection(Sock, {bad_header, Other});
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
-%% the 0-8 spec, confusingly, defines the version as 8-0
-adjust_version({8,0}) -> {0,8};
-adjust_version(Version) -> Version.
-check_version(ClientVersion, ServerVersion) ->
- {ClientMajor, ClientMinor} = adjust_version(ClientVersion),
- {ServerMajor, ServerMinor} = adjust_version(ServerVersion),
- ClientMajor > ServerMajor
- orelse
- (ClientMajor == ServerMajor andalso
- ClientMinor >= ServerMinor).
+%% Offer a protocol version to the client. Connection.start only
+%% includes a major and minor version number, Luckily 0-9 and 0-9-1
+%% are similar enough that clients will be happy with either.
+start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
+ Protocol,
+ State = #v1{sock = Sock, connection = Connection}) ->
+ Start = #'connection.start'{ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = <<"PLAIN AMQPLAIN">>,
+ locales = <<"en_US">> },
+ ok = send_on_channel0(Sock, Start, Protocol),
+ {State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7}.
+
+refuse_connection(Sock, Exception) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
+ throw(Exception).
%%--------------------------------------------------------------------------
-handle_method0(MethodName, FieldsBin, State) ->
+handle_method0(MethodName, FieldsBin,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
try
- handle_method0(rabbit_framing:decode_method_fields(
- MethodName, FieldsBin),
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
catch exit:Reason ->
CompleteReason = case Reason of
@@ -612,14 +645,14 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
State = #v1{connection_state = starting,
- connection = Connection,
+ connection = Connection =
+ #connection{protocol = Protocol},
sock = Sock}) ->
User = rabbit_access_control:check_login(Mechanism, Response),
- ok = send_on_channel0(
- Sock,
- #'connection.tune'{channel_max = 0,
+ Tune = #'connection.tune'{channel_max = 0,
frame_max = ?FRAME_MAX,
- heartbeat = 0}),
+ heartbeat = 0},
+ ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{
user = User,
@@ -645,46 +678,31 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
frame_max = FrameMax}}
end;
-handle_method0(#'connection.open'{virtual_host = VHostPath,
- insist = Insist},
+handle_method0(#'connection.open'{virtual_host = VHostPath},
+
State = #v1{connection_state = opening,
connection = Connection = #connection{
- user = User},
+ user = User,
+ protocol = Protocol},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- KnownHosts = format_listeners(rabbit_networking:active_listeners()),
- Redirects = compute_redirects(Insist),
- if Redirects == [] ->
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{known_hosts = KnownHosts}),
- State#v1{connection_state = running,
- connection = NewConnection};
- true ->
- %% FIXME: 'host' is supposed to only contain one
- %% address; but which one do we pick? This is
- %% really a problem with the spec.
- Host = format_listeners(Redirects),
- rabbit_log:info("connection ~p redirecting to ~p~n",
- [self(), Host]),
- ok = send_on_channel0(
- Sock,
- #'connection.redirect'{host = Host,
- known_hosts = KnownHosts}),
- close_connection(State#v1{connection = NewConnection})
- end;
+ ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State#v1{connection_state = running,
+ connection = NewConnection};
handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol},
sock = Sock})
when CS =:= closing; CS =:= closed ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
- ok = send_on_channel0(Sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
@@ -697,23 +715,8 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-send_on_channel0(Sock, Method) ->
- ok = rabbit_writer:internal_send_command(Sock, 0, Method).
-
-format_listeners(Listeners) ->
- list_to_binary(
- rabbit_misc:intersperse(
- $,,
- [io_lib:format("~s:~w", [Host, Port]) ||
- #listener{host = Host, port = Port} <- Listeners])).
-
-compute_redirects(true) -> [];
-compute_redirects(false) ->
- Node = node(),
- LNode = rabbit_load:pick(),
- if Node == LNode -> [];
- true -> rabbit_networking:node_listeners(LNode)
- end.
+send_on_channel0(Sock, Method, Protocol) ->
+ ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
%%--------------------------------------------------------------------------
@@ -747,6 +750,10 @@ i(state, #v1{connection_state = S}) ->
S;
i(channels, #v1{}) ->
length(all_channels());
+i(protocol, #v1{connection = #connection{protocol = none}}) ->
+ none;
+i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
+ Protocol:version();
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
@@ -770,11 +777,13 @@ send_to_new_channel(Channel, AnalyzedFrame,
#v1{sock = Sock, connection = #connection{
frame_max = FrameMax,
user = #user{username = Username},
- vhost = VHost}} = State,
- WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
- ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/6,
- [Channel, self(), WriterPid, Username, VHost, Collector]),
+ vhost = VHost,
+ protocol = Protocol}} = State,
+ {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol),
+ {ok, ChPid} = rabbit_framing_channel:start_link(
+ fun rabbit_channel:start_link/6,
+ [Channel, self(), WriterPid, Username, VHost, Collector],
+ Protocol),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
@@ -790,25 +799,27 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) ->
log_channel_error(CS, Channel, Reason),
send_exception(State, Channel, Reason).
-send_exception(State, Channel, Reason) ->
- {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason),
+send_exception(State = #v1{connection = #connection{protocol = Protocol}},
+ Channel, Reason) ->
+ {ShouldClose, CloseChannel, CloseMethod} =
+ map_exception(Channel, Reason, Protocol),
NewState = case ShouldClose of
true -> terminate_channels(),
close_connection(State);
false -> close_channel(Channel, State)
end,
ok = rabbit_writer:internal_send_command(
- NewState#v1.sock, CloseChannel, CloseMethod),
+ NewState#v1.sock, CloseChannel, CloseMethod, Protocol),
NewState.
-map_exception(Channel, Reason) ->
+map_exception(Channel, Reason, Protocol) ->
{SuggestedClose, ReplyCode, ReplyText, FailedMethod} =
- lookup_amqp_exception(Reason),
+ lookup_amqp_exception(Reason, Protocol),
ShouldClose = SuggestedClose or (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
none -> {0, 0};
- _ -> rabbit_framing:method_id(FailedMethod)
+ _ -> Protocol:method_id(FailedMethod)
end,
{CloseChannel, CloseMethod} =
case ShouldClose of
@@ -823,22 +834,16 @@ map_exception(Channel, Reason) ->
end,
{ShouldClose, CloseChannel, CloseMethod}.
-%% FIXME: this clause can go when we move to AMQP spec >=8.1
-lookup_amqp_exception(#amqp_error{name = precondition_failed,
- explanation = Expl,
- method = Method}) ->
- ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl),
- {false, 406, ExplBin, Method};
lookup_amqp_exception(#amqp_error{name = Name,
explanation = Expl,
- method = Method}) ->
- {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name),
+ method = Method},
+ Protocol) ->
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name),
ExplBin = amqp_exception_explanation(Text, Expl),
{ShouldClose, Code, ExplBin, Method};
-lookup_amqp_exception(Other) ->
+lookup_amqp_exception(Other, Protocol) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {ShouldClose, Code, Text} =
- rabbit_framing:lookup_amqp_exception(internal_error),
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error),
{ShouldClose, Code, Text, none}.
amqp_exception_explanation(Text, Expl) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 090c714b..56aca1d6 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -504,8 +504,10 @@ test_content_framing(FrameMax, BodyBin) ->
rabbit_binary_generator:build_simple_content_frames(
1,
rabbit_binary_generator:ensure_content_encoded(
- rabbit_basic:build_content(#'P_basic'{}, BodyBin)),
- FrameMax),
+ rabbit_basic:build_content(#'P_basic'{}, BodyBin),
+ rabbit_framing_amqp_0_9_1),
+ FrameMax,
+ rabbit_framing_amqp_0_9_1),
%% header is formatted correctly and the size is the total of the
%% fragments
<<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
@@ -938,8 +940,8 @@ test_user_management() ->
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
- self()),
+ {ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
+ <<"user">>, <<"/">>, self()),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -1068,8 +1070,8 @@ test_memory_pressure_sync(Ch, Writer) ->
test_memory_pressure_spawn() ->
Me = self(),
Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
- self()),
+ {ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
+ <<"user">>, <<"/">>, self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
MRef = erlang:monitor(process, Ch),
receive #'channel.open_ok'{} -> ok
@@ -1142,8 +1144,8 @@ test_memory_pressure() ->
alarm_handler:set_alarm({vm_memory_high_watermark, []}),
Me = self(),
Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>,
- self()),
+ {ok, Ch4} = rabbit_channel:start_link(1, self(), Writer4,
+ <<"user">>, <<"/">>, self()),
ok = rabbit_channel:do(Ch4, #'channel.open'{}),
MRef4 = erlang:monitor(process, Ch4),
Writer4 ! sync,
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 2e492b80..3aaf1917 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -39,8 +39,8 @@
delivery/0, content/0, decoded_content/0, undecoded_content/0,
unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0,
- binding/0, amqqueue/0, exchange/0, connection/0, user/0,
- error/1, ok_or_error/1, ok_or_error2/2, ok/1]).
+ binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
+ user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -133,6 +133,8 @@
-type(connection() :: pid()).
+-type(protocol() :: atom()).
+
-type(user() ::
#user{username :: rabbit_access_control:username(),
password :: rabbit_access_control:password()}).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 80602038..f90ee734 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,14 +33,14 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/3, start_link/3, shutdown/1, mainloop/1]).
+-export([start/4, start_link/4, shutdown/1, mainloop/1]).
-export([send_command/2, send_command/3, send_command_and_signal_back/3,
send_command_and_signal_back/4, send_command_and_notify/5]).
--export([internal_send_command/3, internal_send_command/5]).
+-export([internal_send_command/4, internal_send_command/6]).
-import(gen_tcp).
--record(wstate, {sock, channel, frame_max}).
+-record(wstate, {sock, channel, frame_max, protocol}).
-define(HIBERNATE_AFTER, 5000).
@@ -48,14 +48,14 @@
-ifdef(use_specs).
--spec(start/3 ::
+-spec(start/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer())
- -> pid()).
--spec(start_link/3 ::
+ non_neg_integer(), rabbit_types:protocol())
+ -> rabbit_types:ok(pid())).
+-spec(start_link/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer())
- -> pid()).
+ non_neg_integer(), rabbit_types:protocol())
+ -> rabbit_types:ok(pid())).
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
@@ -70,29 +70,31 @@
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
-> 'ok').
--spec(internal_send_command/3 ::
+-spec(internal_send_command/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- rabbit_framing:amqp_method_record())
+ rabbit_framing:amqp_method_record(), rabbit_types:protocol())
-> 'ok').
--spec(internal_send_command/5 ::
+-spec(internal_send_command/6 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
rabbit_framing:amqp_method_record(), rabbit_types:content(),
- non_neg_integer())
+ non_neg_integer(), rabbit_types:protocol())
-> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start(Sock, Channel, FrameMax) ->
- spawn(?MODULE, mainloop, [#wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax}]).
-
-start_link(Sock, Channel, FrameMax) ->
- spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+start(Sock, Channel, FrameMax, Protocol) ->
+ {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}]).
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
+
+start_link(Sock, Channel, FrameMax, Protocol) ->
+ {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
mainloop(State) ->
receive
@@ -102,35 +104,40 @@ mainloop(State) ->
end.
handle_message({send_command, MethodRecord},
- State = #wstate{sock = Sock, channel = Channel}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord),
+ State = #wstate{sock = Sock, channel = Channel,
+ protocol = Protocol}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
State;
handle_message({send_command, MethodRecord, Content},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
State;
handle_message({send_command_and_signal_back, MethodRecord, Parent},
- State = #wstate{sock = Sock, channel = Channel}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord),
+ State = #wstate{sock = Sock, channel = Channel,
+ protocol = Protocol}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
Parent ! rabbit_writer_send_command_signal,
State;
handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
Parent ! rabbit_writer_send_command_signal,
State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
handle_message({inet_reply, _, ok}, State) ->
@@ -171,30 +178,32 @@ shutdown(W) ->
%---------------------------------------------------------------------------
-assemble_frames(Channel, MethodRecord) ->
+assemble_frames(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
- rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord).
+ rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord,
+ Protocol).
-assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
+assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, Content),
MethodName = rabbit_misc:method_record_type(MethodRecord),
- true = rabbit_framing:method_has_content(MethodName), % assertion
+ true = Protocol:method_has_content(MethodName), % assertion
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
- Channel, MethodRecord),
+ Channel, MethodRecord, Protocol),
ContentFrames = rabbit_binary_generator:build_simple_content_frames(
- Channel, Content, FrameMax),
+ Channel, Content, FrameMax, Protocol),
[MethodFrame | ContentFrames].
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
-internal_send_command(Sock, Channel, MethodRecord) ->
- ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
+internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
+ ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)).
-internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
+internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
+ Protocol) ->
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)).
+ Content, FrameMax, Protocol)).
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
@@ -214,13 +223,14 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
-internal_send_command_async(Sock, Channel, MethodRecord) ->
- true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)),
+internal_send_command_async(Sock, Channel, MethodRecord, Protocol) ->
+ true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
ok.
-internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
+internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax,
+ Protocol) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)),
+ Content, FrameMax, Protocol)),
ok.
port_cmd(Sock, Data) ->