summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-02 17:44:09 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-02 17:44:09 +0100
commitab5e0e4d8e749be10ed2b2dcea1a4ef92eda4388 (patch)
treef05ae972675a2f7c82e0be3eb978d6a3bd6e3c4f
parent3920e36c2b6b4a0302162998396e1ba35eb1d7a2 (diff)
parent4f4640a08f676cca8605299101265f6fd82277b8 (diff)
downloadrabbitmq-server-ab5e0e4d8e749be10ed2b2dcea1a4ef92eda4388.tar.gz
Merged heads
-rw-r--r--codegen.py8
-rw-r--r--include/rabbit.hrl4
-rw-r--r--src/rabbit_basic.erl5
-rw-r--r--src/rabbit_binary_generator.erl28
-rw-r--r--src/rabbit_binary_parser.erl10
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_reader.erl19
-rw-r--r--src/rabbit_tests.erl3
-rw-r--r--src/rabbit_writer.erl2
9 files changed, 44 insertions, 39 deletions
diff --git a/codegen.py b/codegen.py
index 9596f5b1..9eb6fca2 100644
--- a/codegen.py
+++ b/codegen.py
@@ -316,7 +316,7 @@ def genErl(spec):
printFileHeader()
module = "rabbit_framing_amqp_%d_%d" % (spec.major, spec.minor)
- if spec.revision != '0':
+ if spec.revision != 0:
module = "%s_%d" % (module, spec.revision)
if module == "rabbit_framing_amqp_8_0":
module = "rabbit_framing_amqp_0_8"
@@ -336,6 +336,7 @@ def genErl(spec):
-export([encode_properties/1]).
-export([lookup_amqp_exception/1]).
-export([amqp_exception/1]).
+-export([version/0]).
bitvalue(true) -> 1;
bitvalue(false) -> 0;
@@ -355,6 +356,7 @@ bitvalue(undefined) -> 0.
-spec(encode_properties/1 :: (amqp_method_record()) -> binary()).
-spec(lookup_amqp_exception/1 :: (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}).
-spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()).
+-spec(version/0 :: () -> {integer, integer, integer}).
-endif. % use_specs
"""
for m in methods: genLookupMethodName(m)
@@ -396,6 +398,10 @@ bitvalue(undefined) -> 0.
for(c,v,cls) in spec.constants: genAmqpException(c,v,cls)
print "amqp_exception(_Code) -> undefined."
+ version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision)
+ if version == '{8, 0, 0}': version = '{0, 8, 0}'
+ print "version() -> %s." % (version)
+
def genHrl(spec):
def erlType(domain):
return erlangTypeMap[spec.resolveDomain(domain)]
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 1bb89af9..8c713a50 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -36,8 +36,8 @@
-record(vhost, {virtual_host, dummy}).
--record(connection, {protocol, protocol_name, user, timeout_sec, frame_max,
- vhost, client_properties}).
+-record(connection, {protocol, user, timeout_sec, frame_max, vhost,
+ client_properties}).
-record(content,
{class_id,
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 7b581b8d..4a1d50df 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -92,8 +92,9 @@ from_content(Content) ->
#content{class_id = ClassId,
properties = Props,
payload_fragments_rev = FragmentsRev} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ rabbit_binary_parser:ensure_content_decoded(Content,
+ rabbit_framing_amqp_0_9_1),
{ClassId, _MethodId} =
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index d159f309..75cd643c 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -42,11 +42,11 @@
-define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8).
-export([build_simple_method_frame/3,
- build_simple_content_frames/3,
+ build_simple_content_frames/4,
build_heartbeat_frame/0]).
-export([generate_table/1, encode_properties/2]).
-export([check_empty_content_body_frame_size/0]).
--export([ensure_content_encoded/1, clear_encoded_content/1]).
+-export([ensure_content_encoded/2, clear_encoded_content/1]).
-import(lists).
@@ -58,13 +58,14 @@
-spec(build_simple_method_frame/3 ::
(channel_number(), amqp_method_record(), protocol()) -> frame()).
--spec(build_simple_content_frames/3 ::
- (channel_number(), content(), non_neg_integer()) -> [frame()]).
+-spec(build_simple_content_frames/4 ::
+ (channel_number(), content(), non_neg_integer(), protocol()) ->
+ [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
-spec(generate_table/1 :: (amqp_table()) -> binary()).
-spec(encode_properties/2 :: ([amqp_property_type()], [any()]) -> binary()).
-spec(check_empty_content_body_frame_size/0 :: () -> 'ok').
--spec(ensure_content_encoded/1 :: (content()) -> encoded_content()).
+-spec(ensure_content_encoded/2 :: (content(), protocol()) -> encoded_content()).
-spec(clear_encoded_content/1 :: (content()) -> unencoded_content()).
-endif.
@@ -82,18 +83,18 @@ build_simple_content_frames(ChannelInt,
properties = ContentProperties,
properties_bin = ContentPropertiesBin,
payload_fragments_rev = PayloadFragmentsRev},
- FrameMax) ->
+ FrameMax, Protocol) ->
{BodySize, ContentFrames} = build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt),
HeaderFrame = create_frame(2, ChannelInt,
[<<ClassId:16, 0:16, BodySize:64>>,
- maybe_encode_properties(ContentProperties, ContentPropertiesBin)]),
+ maybe_encode_properties(ContentProperties, ContentPropertiesBin, Protocol)]),
[HeaderFrame | ContentFrames].
-maybe_encode_properties(_ContentProperties, ContentPropertiesBin)
+maybe_encode_properties(_ContentProperties, ContentPropertiesBin, _Protocol)
when is_binary(ContentPropertiesBin) ->
ContentPropertiesBin;
-maybe_encode_properties(ContentProperties, none) ->
- rabbit_framing_amqp_0_9_1:encode_properties(ContentProperties).
+maybe_encode_properties(ContentProperties, none, Protocol) ->
+ Protocol:encode_properties(ContentProperties).
build_content_frames(FragsRev, FrameMax, ChannelInt) ->
BodyPayloadMax = if FrameMax == 0 ->
@@ -277,12 +278,11 @@ check_empty_content_body_frame_size() ->
ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE})
end.
-ensure_content_encoded(Content = #content{properties_bin = PropsBin})
+ensure_content_encoded(Content = #content{properties_bin = PropsBin}, _Protocol)
when PropsBin =/= 'none' ->
Content;
-ensure_content_encoded(Content = #content{properties = Props}) ->
- Content #content{properties_bin =
- rabbit_framing_amqp_0_9_1:encode_properties(Props)}.
+ensure_content_encoded(Content = #content{properties = Props}, Protocol) ->
+ Content#content{properties_bin = Protocol:encode_properties(Props)}.
clear_encoded_content(Content = #content{properties_bin = none}) ->
Content;
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 7a32acae..633be6f0 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -34,7 +34,7 @@
-include("rabbit.hrl").
-export([parse_table/1, parse_properties/2]).
--export([ensure_content_decoded/1, clear_decoded_content/1]).
+-export([ensure_content_decoded/2, clear_decoded_content/1]).
-import(lists).
@@ -44,7 +44,7 @@
-spec(parse_table/1 :: (binary()) -> amqp_table()).
-spec(parse_properties/2 :: ([amqp_property_type()], binary()) -> [any()]).
--spec(ensure_content_decoded/1 :: (content()) -> decoded_content()).
+-spec(ensure_content_decoded/2 :: (content(), protocol()) -> decoded_content()).
-spec(clear_decoded_content/1 :: (content()) -> undecoded_content()).
-endif.
@@ -159,12 +159,12 @@ parse_property(bit, Rest) ->
parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) ->
{parse_table(Table), Rest}.
-ensure_content_decoded(Content = #content{properties = Props})
+ensure_content_decoded(Content = #content{properties = Props}, _Protocol)
when Props =/= 'none' ->
Content;
-ensure_content_decoded(Content = #content{properties_bin = PropBin})
+ensure_content_decoded(Content = #content{properties_bin = PropBin}, Protocol)
when is_binary(PropBin) ->
- Content#content{properties = rabbit_framing_amqp_0_9_1:decode_properties(
+ Content#content{properties = Protocol:decode_properties(
Content#content.class_id, PropBin)}.
clear_decoded_content(Content = #content{properties = none}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index da91bef8..c3440f84 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -414,7 +414,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
+ DecodedContent =
+ rabbit_binary_parser:ensure_content_decoded(Content,
+ rabbit_framing_amqp_0_9_1),
IsPersistent = is_message_persistent(DecodedContent),
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 72d95c7b..9fbf7ce8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -560,14 +560,14 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
%% * The server MUST provide a protocol version that is lower than or
%% equal to that requested by the client in the protocol header.
handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) ->
- start_connection({0, 9, 1}, amqp_0_9_1, State);
+ start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) ->
- start_connection({0, 9, 0}, amqp_0_9_1, State);
+ start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
%% the 0-8 spec, confusingly, defines the version as 8-0
handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
- start_connection({8, 0, 0}, amqp_0_8, State);
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_version, A, B, C, D});
@@ -582,12 +582,8 @@ handle_input(Callback, Data, _State) ->
%% includes a major and minor version number, Luckily 0-9 and 0-9-1
%% are similar enough that clients will be happy with either.
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
- ProtocolName,
+ Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
- Protocol = case ProtocolName of
- amqp_0_9_1 -> rabbit_framing_amqp_0_9_1;
- amqp_0_8 -> rabbit_framing_amqp_0_8
- end,
Start = #'connection.start'{ version_major = ProtocolMajor,
version_minor = ProtocolMinor,
server_properties = server_properties(),
@@ -596,8 +592,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
ok = send_on_channel0(Sock, Start, Protocol),
{State#v1{connection = Connection#connection{
timeout_sec = ?NORMAL_TIMEOUT,
- protocol = Protocol,
- protocol_name = ProtocolName},
+ protocol = Protocol},
connection_state = starting},
frame_header, 7}.
@@ -736,8 +731,8 @@ i(state, #v1{connection_state = S}) ->
S;
i(channels, #v1{}) ->
length(all_channels());
-i(protocol, #v1{connection = #connection{protocol_name = ProtocolName}}) ->
- ProtocolName;
+i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
+ Protocol:version();
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 960d9a9c..53f73b9d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -361,7 +361,8 @@ test_content_framing(FrameMax, Fragments) ->
1,
#content{class_id = 0, properties_bin = <<>>,
payload_fragments_rev = Fragments},
- FrameMax),
+ FrameMax,
+ rabbit_framing_amqp_0_9_1),
%% header is formatted correctly and the size is the total of the
%% fragments
<<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 33e12a24..6bdc1742 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -174,7 +174,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
Channel, MethodRecord, Protocol),
ContentFrames = rabbit_binary_generator:build_simple_content_frames(
- Channel, Content, FrameMax),
+ Channel, Content, FrameMax, Protocol),
[MethodFrame | ContentFrames].
tcp_send(Sock, Data) ->