summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-07-02 21:18:19 +0000
committerTed Ross <tross@apache.org>2013-07-02 21:18:19 +0000
commitd48d6d60421c14c28720f696ffeaeaa9aa096180 (patch)
tree404df4dbb688c091e01a4bb6b0696889ab253c88
parent706370f431afa3e812b94878fd9bc6a09a0920d5 (diff)
downloadqpid-python-d48d6d60421c14c28720f696ffeaeaa9aa096180.tar.gz
QPID-4974 - Generalized the mechanisms for composing and parsing AMQP-encoded fields.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1499115 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/dispatch/CMakeLists.txt1
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/amqp.h80
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/compose.h12
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/iterator.h26
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/parse.h195
-rw-r--r--qpid/extras/dispatch/src/agent.c40
-rw-r--r--qpid/extras/dispatch/src/compose.c59
-rw-r--r--qpid/extras/dispatch/src/iterator.c309
-rw-r--r--qpid/extras/dispatch/src/message.c1
-rw-r--r--qpid/extras/dispatch/src/parse.c379
10 files changed, 762 insertions, 340 deletions
diff --git a/qpid/extras/dispatch/CMakeLists.txt b/qpid/extras/dispatch/CMakeLists.txt
index fb62be2eeb..e0dbfcfc3f 100644
--- a/qpid/extras/dispatch/CMakeLists.txt
+++ b/qpid/extras/dispatch/CMakeLists.txt
@@ -86,6 +86,7 @@ set(server_SOURCES
src/iterator.c
src/log.c
src/message.c
+ src/parse.c
src/posix/threading.c
src/python_embedded.c
src/router_node.c
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/amqp.h b/qpid/extras/dispatch/include/qpid/dispatch/amqp.h
new file mode 100644
index 0000000000..9babeb930d
--- /dev/null
+++ b/qpid/extras/dispatch/include/qpid/dispatch/amqp.h
@@ -0,0 +1,80 @@
+#ifndef __dispatch_amqp_h__
+#define __dispatch_amqp_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * AMQP Performative Tags
+ */
+#define DX_PERFORMATIVE_HEADER 0x70
+#define DX_PERFORMATIVE_DELIVERY_ANNOTATIONS 0x71
+#define DX_PERFORMATIVE_MESSAGE_ANNOTATIONS 0x72
+#define DX_PERFORMATIVE_PROPERTIES 0x73
+#define DX_PERFORMATIVE_APPLICATION_PROPERTIES 0x74
+#define DX_PERFORMATIVE_BODY_DATA 0x75
+#define DX_PERFORMATIVE_BODY_AMQP_SEQUENCE 0x76
+#define DX_PERFORMATIVE_BODY_AMQP_VALUE 0x77
+#define DX_PERFORMATIVE_FOOTER 0x78
+
+
+/**
+ * AMQP Type Tags
+ */
+#define DX_AMQP_NULL 0x40
+#define DX_AMQP_BOOLEAN 0x56
+#define DX_AMQP_TRUE 0x41
+#define DX_AMQP_FALSE 0x42
+#define DX_AMQP_UBYTE 0x50
+#define DX_AMQP_USHORT 0x60
+#define DX_AMQP_UINT 0x70
+#define DX_AMQP_SMALLUINT 0x52
+#define DX_AMQP_UINT0 0x43
+#define DX_AMQP_ULONG 0x80
+#define DX_AMQP_SMALLULONG 0x53
+#define DX_AMQP_ULONG0 0x44
+#define DX_AMQP_BYTE 0x51
+#define DX_AMQP_SHORT 0x61
+#define DX_AMQP_INT 0x71
+#define DX_AMQP_SMALLINT 0x54
+#define DX_AMQP_LONG 0x81
+#define DX_AMQP_SMALLLONG 0x55
+#define DX_AMQP_FLOAT 0x72
+#define DX_AMQP_DOUBLE 0x82
+#define DX_AMQP_DECIMAL32 0x74
+#define DX_AMQP_DECIMAL64 0x84
+#define DX_AMQP_DECIMAL128 0x94
+#define DX_AMQP_UTF32 0x73
+#define DX_AMQP_TIMESTAMP 0x83
+#define DX_AMQP_UUID 0x98
+#define DX_AMQP_VBIN8 0xa0
+#define DX_AMQP_VBIN32 0xb0
+#define DX_AMQP_STR8_UTF8 0xa1
+#define DX_AMQP_STR32_UTF8 0xb1
+#define DX_AMQP_SYM8 0xa3
+#define DX_AMQP_SYM32 0xb3
+#define DX_AMQP_LIST0 0x45
+#define DX_AMQP_LIST8 0xc0
+#define DX_AMQP_LIST32 0xd0
+#define DX_AMQP_MAP8 0xc1
+#define DX_AMQP_MAP32 0xd1
+#define DX_AMQP_ARRAY8 0xe0
+#define DX_AMQP_ARRAY32 0xf0
+
+#endif
+
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/compose.h b/qpid/extras/dispatch/include/qpid/dispatch/compose.h
index 514b450e58..7d668f0ac0 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/compose.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/compose.h
@@ -24,16 +24,6 @@
typedef struct dx_composed_field_t dx_composed_field_t;
-#define DX_PERFORMATIVE_HEADER 0x70
-#define DX_PERFORMATIVE_DELIVERY_ANNOTATIONS 0x71
-#define DX_PERFORMATIVE_MESSAGE_ANNOTATIONS 0x72
-#define DX_PERFORMATIVE_PROPERTIES 0x73
-#define DX_PERFORMATIVE_APPLICATION_PROPERTIES 0x74
-#define DX_PERFORMATIVE_BODY_DATA 0x75
-#define DX_PERFORMATIVE_BODY_AMQP_SEQUENCE 0x76
-#define DX_PERFORMATIVE_BODY_AMQP_VALUE 0x77
-#define DX_PERFORMATIVE_FOOTER 0x78
-
/**
* Begin composing a new field for a message. The new field can be standalone or
* appended onto an existing field.
@@ -43,7 +33,7 @@ typedef struct dx_composed_field_t dx_composed_field_t;
* create a standalone field.
* @return A pointer to the newly created field.
*/
-dx_composed_field_t *dx_compose(uint8_t performative, dx_composed_field_t *extend);
+dx_composed_field_t *dx_compose(uint64_t performative, dx_composed_field_t *extend);
/**
* Free the resources associated with a composed field.
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/iterator.h b/qpid/extras/dispatch/include/qpid/dispatch/iterator.h
index 2412995432..5269abd4b2 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/iterator.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/iterator.h
@@ -19,6 +19,7 @@
* under the License.
*/
+#include <stdint.h>
#include <qpid/dispatch/buffer.h>
#include <qpid/dispatch/iovec.h>
@@ -132,6 +133,14 @@ unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter);
int dx_field_iterator_end(dx_field_iterator_t *iter);
/**
+ * Return a sub-iterator that equals the supplied iterator except that it
+ * starts at the supplied iterator's current position.
+ */
+dx_field_iterator_t *dx_field_iterator_sub(dx_field_iterator_t *iter, uint32_t length);
+
+void dx_field_iterator_advance(dx_field_iterator_t *iter, uint32_t length);
+
+/**
* Compare an input string to the iterator's view. Return true iff they are equal.
*/
int dx_field_iterator_equal(dx_field_iterator_t *iter, const unsigned char *string);
@@ -149,15 +158,14 @@ int dx_field_iterator_prefix(dx_field_iterator_t *iter, const char *prefix);
*/
unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter);
-
+/**
+ * Return the contents of this iter into an iovec structure. This is used in a
+ * scatter/gather IO mechanism. If the iterator spans multiple physical buffers,
+ * the iovec structure will contain one pointer per buffer.
+ *
+ * @param iter A field iterator
+ * @return An iovec structure that references the data in the iterator's buffers.
+ */
dx_iovec_t *dx_field_iterator_iovec(const dx_field_iterator_t *iter);
-typedef struct dx_field_map_t dx_field_map_t;
-
-dx_field_map_t *dx_field_map(dx_field_iterator_t *iter, int string_keys_only);
-void dx_field_map_free(dx_field_map_t *map);
-dx_field_iterator_t *dx_field_map_by_key(dx_field_map_t *map, const char *key);
-
-dx_field_iterator_t *dx_field_raw(dx_field_iterator_t *iter);
-
#endif
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/parse.h b/qpid/extras/dispatch/include/qpid/dispatch/parse.h
new file mode 100644
index 0000000000..087a440c4e
--- /dev/null
+++ b/qpid/extras/dispatch/include/qpid/dispatch/parse.h
@@ -0,0 +1,195 @@
+#ifndef __dispatch_parse_h__
+#define __dispatch_parse_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/buffer.h>
+#include <qpid/dispatch/iterator.h>
+
+typedef struct dx_parsed_field_t dx_parsed_field_t;
+
+/**
+ * Parse a field delimited by a field iterator.
+ *
+ * @param iter Field iterator for the field being parsed
+ * @return A pointer to the newly created field.
+ */
+dx_parsed_field_t *dx_parse(dx_field_iterator_t *iter);
+
+/**
+ * Free the resources associated with a parsed field.
+ *
+ * @param A field pointer returned by dx_parse.
+ */
+void dx_parse_free(dx_parsed_field_t *field);
+
+/**
+ * Check to see if the field parse was successful (i.e. the field was
+ * well-formed).
+ *
+ * @param field The field pointer returned by dx_parse.
+ * @return true iff the field was well-formed and successfully parsed.
+ */
+int dx_parse_ok(dx_parsed_field_t *field);
+
+/**
+ * Return the text of the error describing the parse error if the field
+ * is not well-formed.
+ *
+ * @param field The field pointer returned by dx_parse.
+ * @return a null-terminated string describing the parse failure.
+ */
+const char *dx_parse_error(dx_parsed_field_t *field);
+
+/**
+ * Return the AMQP tag for the parsed (and well-formed) field.
+ *
+ * @param field The field pointer returned by dx_parse.
+ * @return The tag (see amqp.h) that indicates the type of the field.
+ */
+uint8_t dx_parse_tag(dx_parsed_field_t *field);
+
+/**
+ * Return an iterator for the raw content of the field. This is useful
+ * only for scalar fields. It is not appropriate for compound fields.
+ * For compound fields, use the sub-field functions instead.
+ *
+ * The returned iterator describes the raw content of the field, and can be
+ * used for comparison, indexing, or copying.
+ *
+ * IMPORTANT: The returned iterator is owned by the field and *must not* be
+ * freed by the caller of this function.
+ *
+ * @param field The field pointer returned by dx_parse.
+ * @return A field iterator that describes the field's raw content.
+ */
+dx_field_iterator_t *dx_parse_raw(dx_parsed_field_t *field);
+
+/**
+ * Return the raw content as an unsigned integer up to 32-bits. This is
+ * valid only for scalar fields of a fixed size of 4-octets or fewer.
+ *
+ * @param field The field pointer returned by dx_parse.
+ * @return The raw content of the field cast as a uint32_t.
+ */
+uint32_t dx_parse_as_uint(dx_parsed_field_t *field);
+
+/**
+ * Return the raw content as an unsigned integer up to 64-bits. This is
+ * valid only for scalar fields of a fixed size of 8-octets or fewer.
+ *
+ * @param field The field pointer returned by dx_parse.
+ * @return The raw content of the field cast as a uint64_t.
+ */
+uint64_t dx_parse_as_ulong(dx_parsed_field_t *field);
+
+/**
+ * Return the raw content as a signed integer up to 32-bits. This is
+ * valid only for scalar fields of a fixed size of 4-octets or fewer.
+ *
+ * @param field The field pointer returned by dx_parse.
+ * @return The raw content of the field cast as an int32_t.
+ */
+int32_t dx_parse_as_int(dx_parsed_field_t *field);
+
+/**
+ * Return the raw content as a signed integer up to 64-bits. This is
+ * valid only for scalar fields of a fixed size of 8-octets or fewer.
+ *
+ * @param field The field pointer returned by dx_parse.
+ * @return The raw content of the field cast as an int64_t.
+ */
+int64_t dx_parse_as_long(dx_parsed_field_t *field);
+
+/**
+ * Return the number of sub-field in a compound field. If the field is
+ * a list or array, this is the number of items in the list/array. If
+ * the field is a map, this is the number of key/value pairs in the map
+ * (i.e. half the number of actual sub-field in the map).
+ *
+ * For scalar fields, this function will return zero.
+ *
+ * @param field The field pointer returned by dx_parse.
+ * @return The number of sub-fields in the field.
+ */
+uint32_t dx_parse_sub_count(dx_parsed_field_t *field);
+
+/**
+ * Return a dx_parsed_field_t for the idx'th key in a map field.
+ * If 'field' is not a map, or idx is equal-to or greater-than the number
+ * of sub-fields in field, this function will return NULL.
+ *
+ * IMPORTANT: The pointer returned by this function remains owned by the
+ * parent field. It *must not* be freed by the caller.
+ *
+ * @param field The field pointer returned by dx_parse.
+ * @param idx The index of the desired sub-field (in range 0..sub_count)
+ * @return A pointer to the parsed sub-field
+ */
+dx_parsed_field_t *dx_parse_sub_key(dx_parsed_field_t *field, uint32_t idx);
+
+/**
+ * Return a dx_parsed_field_t for the idx'th value in a compound field.
+ * If idx is equal-to or greater-than the number of sub-fields in field,
+ * this function will return NULL.
+ *
+ * IMPORTANT: The pointer returned by this function remains owned by the
+ * parent field. It *must not* be freed by the caller.
+ *
+ * @param field The field pointer returned by dx_parse.
+ * @param idx The index of the desired sub-field (in range 0..sub_count)
+ * @return A pointer to the parsed sub-field
+ */
+dx_parsed_field_t *dx_parse_sub_value(dx_parsed_field_t *field, uint32_t idx);
+
+/**
+ * Convenience Function - Return true iff the field is a map.
+ *
+ * @param field The field pointer returned by dx_parse[_sub_{value,key}]
+ * @return non-zero if the condition is mat.
+ */
+int dx_parse_is_map(dx_parsed_field_t *field);
+
+/**
+ * Convenience Function - Return true iff the field is a list.
+ *
+ * @param field The field pointer returned by dx_parse[_sub_{value,key}]
+ * @return non-zero if the condition is mat.
+ */
+int dx_parse_is_list(dx_parsed_field_t *field);
+
+/**
+ * Convenience Function - Return true iff the field is a scalar type.
+ *
+ * @param field The field pointer returned by dx_parse[_sub_{value,key}]
+ * @return non-zero if the condition is mat.
+ */
+int dx_parse_is_scalar(dx_parsed_field_t *field);
+
+/**
+ * Convenience Function - Return the value for a key in a map.
+ *
+ * @param field The field pointer returned by dx_parse[_sub_{value,key}]
+ * @param key The key to search for in the map.
+ * @return The value field corresponding to the key or NULL.
+ */
+dx_parsed_field_t *dx_parse_value_by_key(dx_parsed_field_t *field, const char *key);
+
+#endif
+
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c
index 70f31d1522..88b2cf2bcd 100644
--- a/qpid/extras/dispatch/src/agent.c
+++ b/qpid/extras/dispatch/src/agent.c
@@ -30,6 +30,8 @@
#include <qpid/dispatch/router.h>
#include <qpid/dispatch/log.h>
#include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/parse.h>
+#include <qpid/dispatch/amqp.h>
#include <string.h>
#include <stdio.h>
@@ -61,13 +63,13 @@ typedef struct {
static char *log_module = "AGENT";
-static void dx_agent_process_get(dx_agent_t *agent, dx_field_map_t *map, dx_field_iterator_t *reply_to)
+static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
{
- dx_field_iterator_t *cls = dx_field_map_by_key(map, "type");
+ dx_parsed_field_t *cls = dx_parse_value_by_key(map, "type");
if (cls == 0)
return;
- dx_field_iterator_t *cls_string = dx_field_raw(cls);
+ dx_field_iterator_t *cls_string = dx_parse_raw(cls);
const dx_agent_class_t *cls_record;
hash_retrieve_const(agent->class_hash, cls_string, (const void**) &cls_record);
if (cls_record == 0)
@@ -168,20 +170,39 @@ static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg)
return;
//
- // Try to get a map-view of the application-properties. Exit if it is not a map-value.
+ // Try to get a map-view of the application-properties.
//
- dx_field_map_t *map = dx_field_map(ap, 1);
+ dx_parsed_field_t *map = dx_parse(ap);
if (map == 0) {
dx_field_iterator_free(ap);
return;
}
//
+ // Exit if there was a parsing error.
+ //
+ if (!dx_parse_ok(map)) {
+ dx_log(log_module, LOG_TRACE, "Received unparsable App Properties: %s", dx_parse_error(map));
+ dx_field_iterator_free(ap);
+ dx_parse_free(map);
+ return;
+ }
+
+ //
+ // Exit if it is not a map.
+ //
+ if (!dx_parse_is_map(map)) {
+ dx_field_iterator_free(ap);
+ dx_parse_free(map);
+ return;
+ }
+
+ //
// Get an iterator for the "operation" field in the map. Exit if the key is not found.
//
- dx_field_iterator_t *operation = dx_field_map_by_key(map, "operation");
+ dx_parsed_field_t *operation = dx_parse_value_by_key(map, "operation");
if (operation == 0) {
- dx_field_map_free(map);
+ dx_parse_free(map);
dx_field_iterator_free(ap);
return;
}
@@ -189,12 +210,11 @@ static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg)
//
// Dispatch the operation to the appropriate handler
//
- dx_field_iterator_t *operation_string = dx_field_raw(operation);
+ dx_field_iterator_t *operation_string = dx_parse_raw(operation);
if (dx_field_iterator_equal(operation_string, (unsigned char*) "GET"))
dx_agent_process_get(agent, map, reply_to);
- dx_field_iterator_free(operation_string);
- dx_field_map_free(map);
+ dx_parse_free(map);
dx_field_iterator_free(ap);
dx_field_iterator_free(reply_to);
}
diff --git a/qpid/extras/dispatch/src/compose.c b/qpid/extras/dispatch/src/compose.c
index f20fd53251..0f298c8217 100644
--- a/qpid/extras/dispatch/src/compose.c
+++ b/qpid/extras/dispatch/src/compose.c
@@ -20,6 +20,7 @@
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/buffer.h>
+#include <qpid/dispatch/amqp.h>
#include "message_private.h"
#include "compose_private.h"
#include <memory.h>
@@ -143,9 +144,9 @@ static void dx_overwrite_32(dx_field_location_t *field, uint32_t value)
static void dx_compose_start_composite(dx_composed_field_t *field, int isMap)
{
if (isMap)
- dx_insert_8(field, 0xd1); // map32
+ dx_insert_8(field, DX_AMQP_MAP32);
else
- dx_insert_8(field, 0xd0); // list32
+ dx_insert_8(field, DX_AMQP_LIST32);
//
// Push a composite descriptor on the field stack
@@ -204,7 +205,7 @@ static void dx_compose_end_composite(dx_composed_field_t *field)
}
-dx_composed_field_t *dx_compose(uint8_t performative, dx_composed_field_t *extend)
+dx_composed_field_t *dx_compose(uint64_t performative, dx_composed_field_t *extend)
{
dx_composed_field_t *field = extend;
@@ -219,8 +220,8 @@ dx_composed_field_t *dx_compose(uint8_t performative, dx_composed_field_t *exten
DEQ_INIT(field->fieldStack);
}
- dx_insert(field, (const uint8_t*) "\x00\x53", 2);
- dx_insert_8(field, performative);
+ dx_insert_8(field, 0x00);
+ dx_compose_insert_ulong(field, performative);
return field;
}
@@ -270,14 +271,14 @@ void dx_compose_end_map(dx_composed_field_t *field)
void dx_compose_insert_null(dx_composed_field_t *field)
{
- dx_insert_8(field, 0x40);
+ dx_insert_8(field, DX_AMQP_NULL);
bump_count(field);
}
void dx_compose_insert_bool(dx_composed_field_t *field, int value)
{
- dx_insert_8(field, value ? 0x41 : 0x42);
+ dx_insert_8(field, value ? DX_AMQP_TRUE : DX_AMQP_FALSE);
bump_count(field);
}
@@ -285,12 +286,12 @@ void dx_compose_insert_bool(dx_composed_field_t *field, int value)
void dx_compose_insert_uint(dx_composed_field_t *field, uint32_t value)
{
if (value == 0) {
- dx_insert_8(field, 0x43); // uint0
+ dx_insert_8(field, DX_AMQP_UINT0);
} else if (value < 256) {
- dx_insert_8(field, 0x52); // smalluint
+ dx_insert_8(field, DX_AMQP_SMALLUINT);
dx_insert_8(field, (uint8_t) value);
} else {
- dx_insert_8(field, 0x70); // uint
+ dx_insert_8(field, DX_AMQP_UINT);
dx_insert_32(field, value);
}
bump_count(field);
@@ -300,12 +301,12 @@ void dx_compose_insert_uint(dx_composed_field_t *field, uint32_t value)
void dx_compose_insert_ulong(dx_composed_field_t *field, uint64_t value)
{
if (value == 0) {
- dx_insert_8(field, 0x44); // ulong0
+ dx_insert_8(field, DX_AMQP_ULONG0);
} else if (value < 256) {
- dx_insert_8(field, 0x53); // smallulong
+ dx_insert_8(field, DX_AMQP_SMALLULONG);
dx_insert_8(field, (uint8_t) value);
} else {
- dx_insert_8(field, 0x80); // ulong
+ dx_insert_8(field, DX_AMQP_ULONG);
dx_insert_64(field, value);
}
bump_count(field);
@@ -315,10 +316,10 @@ void dx_compose_insert_ulong(dx_composed_field_t *field, uint64_t value)
void dx_compose_insert_int(dx_composed_field_t *field, int32_t value)
{
if (value >= -128 && value <= 127) {
- dx_insert_8(field, 0x54); // smallint
+ dx_insert_8(field, DX_AMQP_SMALLINT);
dx_insert_8(field, (uint8_t) value);
} else {
- dx_insert_8(field, 0x71); // int
+ dx_insert_8(field, DX_AMQP_INT);
dx_insert_32(field, (uint32_t) value);
}
bump_count(field);
@@ -328,10 +329,10 @@ void dx_compose_insert_int(dx_composed_field_t *field, int32_t value)
void dx_compose_insert_long(dx_composed_field_t *field, int64_t value)
{
if (value >= -128 && value <= 127) {
- dx_insert_8(field, 0x55); // smalllong
+ dx_insert_8(field, DX_AMQP_SMALLLONG);
dx_insert_8(field, (uint8_t) value);
} else {
- dx_insert_8(field, 0x81); // long
+ dx_insert_8(field, DX_AMQP_LONG);
dx_insert_64(field, (uint64_t) value);
}
bump_count(field);
@@ -340,7 +341,7 @@ void dx_compose_insert_long(dx_composed_field_t *field, int64_t value)
void dx_compose_insert_timestamp(dx_composed_field_t *field, uint64_t value)
{
- dx_insert_8(field, 0x83); // timestamp
+ dx_insert_8(field, DX_AMQP_TIMESTAMP);
dx_insert_64(field, value);
bump_count(field);
}
@@ -348,7 +349,7 @@ void dx_compose_insert_timestamp(dx_composed_field_t *field, uint64_t value)
void dx_compose_insert_uuid(dx_composed_field_t *field, const uint8_t *value)
{
- dx_insert_8(field, 0x98); // uuid
+ dx_insert_8(field, DX_AMQP_UUID);
dx_insert(field, value, 16);
bump_count(field);
}
@@ -357,10 +358,10 @@ void dx_compose_insert_uuid(dx_composed_field_t *field, const uint8_t *value)
void dx_compose_insert_binary(dx_composed_field_t *field, const uint8_t *value, uint32_t len)
{
if (len < 256) {
- dx_insert_8(field, 0xa0); // vbin8
+ dx_insert_8(field, DX_AMQP_VBIN8);
dx_insert_8(field, (uint8_t) len);
} else {
- dx_insert_8(field, 0xb0); // vbin32
+ dx_insert_8(field, DX_AMQP_VBIN32);
dx_insert_32(field, len);
}
dx_insert(field, value, len);
@@ -385,10 +386,10 @@ void dx_compose_insert_binary_buffers(dx_composed_field_t *field, dx_buffer_list
// Supply the appropriate binary tag for the length.
//
if (len < 256) {
- dx_insert_8(field, 0xa0); // vbin8
+ dx_insert_8(field, DX_AMQP_VBIN8);
dx_insert_8(field, (uint8_t) len);
} else {
- dx_insert_8(field, 0xb0); // vbin32
+ dx_insert_8(field, DX_AMQP_VBIN32);
dx_insert_32(field, len);
}
@@ -409,10 +410,10 @@ void dx_compose_insert_string(dx_composed_field_t *field, const char *value)
uint32_t len = strlen(value);
if (len < 256) {
- dx_insert_8(field, 0xa1); // str8-utf8
+ dx_insert_8(field, DX_AMQP_STR8_UTF8);
dx_insert_8(field, (uint8_t) len);
} else {
- dx_insert_8(field, 0xb1); // str32-utf8
+ dx_insert_8(field, DX_AMQP_STR32_UTF8);
dx_insert_32(field, len);
}
dx_insert(field, (const uint8_t*) value, len);
@@ -432,10 +433,10 @@ void dx_compose_insert_string_iterator(dx_composed_field_t *field, dx_field_iter
dx_field_iterator_reset(iter);
if (len < 256) {
- dx_insert_8(field, 0xa1); // str8-utf8
+ dx_insert_8(field, DX_AMQP_STR8_UTF8);
dx_insert_8(field, (uint8_t) len);
} else {
- dx_insert_8(field, 0xb1); // str32-utf8
+ dx_insert_8(field, DX_AMQP_STR32_UTF8);
dx_insert_32(field, len);
}
@@ -453,10 +454,10 @@ void dx_compose_insert_symbol(dx_composed_field_t *field, const char *value)
uint32_t len = strlen(value);
if (len < 256) {
- dx_insert_8(field, 0xa3); // sym8
+ dx_insert_8(field, DX_AMQP_SYM8);
dx_insert_8(field, (uint8_t) len);
} else {
- dx_insert_8(field, 0xb3); // sym32
+ dx_insert_8(field, DX_AMQP_SYM32);
dx_insert_32(field, len);
}
dx_insert(field, (const uint8_t*) value, len);
diff --git a/qpid/extras/dispatch/src/iterator.c b/qpid/extras/dispatch/src/iterator.c
index 3998cd9f94..43c3b755d0 100644
--- a/qpid/extras/dispatch/src/iterator.c
+++ b/qpid/extras/dispatch/src/iterator.c
@@ -25,7 +25,7 @@
#include <stdio.h>
#include <string.h>
-static const char *log_module = "FIELD";
+//static const char *log_module = "FIELD";
typedef enum {
MODE_TO_END,
@@ -47,35 +47,12 @@ struct dx_field_iterator_t {
unsigned char prefix;
int at_prefix;
int view_prefix;
- unsigned char tag;
};
ALLOC_DECLARE(dx_field_iterator_t);
ALLOC_DEFINE(dx_field_iterator_t);
-typedef struct dx_field_pair_t {
- DEQ_LINKS(struct dx_field_pair_t);
- dx_field_iterator_t *key_iter;
- dx_field_iterator_t *value_iter;
-} dx_field_pair_t;
-
-DEQ_DECLARE(dx_field_pair_t, dx_field_pair_list_t);
-
-ALLOC_DECLARE(dx_field_pair_t);
-ALLOC_DEFINE(dx_field_pair_t);
-
-
-struct dx_field_map_t {
- dx_field_iterator_t *outer;
- int key_count;
- dx_field_pair_list_t pairs;
-};
-
-ALLOC_DECLARE(dx_field_map_t);
-ALLOC_DEFINE(dx_field_map_t);
-
-
typedef enum {
STATE_START,
STATE_SLASH_LEFT,
@@ -263,7 +240,6 @@ dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view
if (!iter)
return 0;
- iter->tag = 0;
iter->start_pointer.buffer = 0;
iter->start_pointer.cursor = (unsigned char*) text;
iter->start_pointer.length = strlen(text);
@@ -280,7 +256,6 @@ dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, i
if (!iter)
return 0;
- iter->tag = 0;
iter->start_pointer.buffer = buffer;
iter->start_pointer.cursor = dx_buffer_base(buffer) + offset;
iter->start_pointer.length = length;
@@ -354,6 +329,33 @@ int dx_field_iterator_end(dx_field_iterator_t *iter)
}
+dx_field_iterator_t *dx_field_iterator_sub(dx_field_iterator_t *iter, uint32_t length)
+{
+ dx_field_iterator_t *sub = new_dx_field_iterator_t();
+ if (!sub)
+ return 0;
+
+ sub->start_pointer = iter->pointer;
+ sub->start_pointer.length = length;
+ sub->view_start_pointer = sub->start_pointer;
+ sub->pointer = sub->start_pointer;
+ sub->view = iter->view;
+ sub->mode = iter->mode;
+ sub->at_prefix = 0;
+ sub->view_prefix = 0;
+
+ return sub;
+}
+
+
+void dx_field_iterator_advance(dx_field_iterator_t *iter, uint32_t length)
+{
+ // TODO - Make this more efficient.
+ for (uint8_t idx = 0; idx < length; idx++)
+ dx_field_iterator_octet(iter);
+}
+
+
int dx_field_iterator_equal(dx_field_iterator_t *iter, const unsigned char *string)
{
dx_field_iterator_reset(iter);
@@ -387,81 +389,6 @@ int dx_field_iterator_prefix(dx_field_iterator_t *iter, const char *prefix)
}
-static dx_field_iterator_t *dx_field_parse_amqp_value(dx_field_iterator_t *iter, unsigned int *available)
-{
- if (*available < 1)
- return 0;
-
- unsigned int start = *available;
- dx_field_iterator_t *value = new_dx_field_iterator_t();
- value->start_pointer = iter->pointer;
- value->view = ITER_VIEW_ALL;
- value->mode = MODE_TO_END;
- value->at_prefix = 0;
- value->view_prefix = 0;
-
- unsigned char tag = dx_field_iterator_octet(iter);
- unsigned int length = 0;
- unsigned int length_size = 0;
-
- (*available)--;
-
- switch (tag & 0xF0) {
- case 0x40: length = 0; break;
- case 0x50: length = 1; break;
- case 0x60: length = 2; break;
- case 0x70: length = 4; break;
- case 0x80: length = 8; break;
- case 0x90: length = 16; break;
- case 0xA0:
- case 0xC0:
- case 0xE0: length_size = 1; break;
- case 0xB0:
- case 0xD0:
- case 0xF0: length_size = 4; break;
- default:
- free_dx_field_iterator_t(value);
- return 0;
- }
-
- if (*available < length_size) {
- free_dx_field_iterator_t(value);
- return 0;
- }
-
- if (length_size == 1) {
- length = (unsigned int) dx_field_iterator_octet(iter);
- } else if (length_size == 4) {
- length = ((unsigned int) dx_field_iterator_octet(iter)) << 24;
- length += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
- length += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
- length += (unsigned int) dx_field_iterator_octet(iter);
- }
-
- if (*available < length) {
- free_dx_field_iterator_t(value);
- return 0;
- }
-
- for (unsigned int idx = 0; idx < length; idx++)
- (void) dx_field_iterator_octet(iter);
- (*available) -= (length + length_size);
-
- value->start_pointer.length = start - *available;
- value->view_start_pointer = value->start_pointer;
- value->pointer = value->start_pointer;
- value->tag = tag;
-
- return value;
-}
-
-
-static int dx_tag_is_string(unsigned char tag)
-{
- return (tag == 0xa1 || tag == 0xb1);
-}
-
-
unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter)
{
int length = 0;
@@ -484,186 +411,6 @@ unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter)
}
-dx_field_map_t *dx_field_map(dx_field_iterator_t *iter, int string_keys_only)
-{
- dx_field_iterator_reset(iter);
- unsigned char tag = dx_field_iterator_octet(iter);
-
- //
- // If this field is not a map, return 0;
- //
- if (tag != 0xc1 && tag != 0xd1) {
- dx_log(log_module, LOG_TRACE, "dx_field_map - Invalid Map, Unexpected tag: %02x", tag);
- return 0;
- }
-
- //
- // Validate the map. Ensure the following:
- // - There are an even number of fields in the compound structure
- // - There are anough octets in the field to account for all of the contents
- // - The field count matches the number of fields present
- // - The keys are strings (if string_keys_only)
- //
- unsigned int length;
- unsigned int count;
-
- if (tag == 0xc1) {
- length = (unsigned int) dx_field_iterator_octet(iter);
- count = (unsigned int) dx_field_iterator_octet(iter);
- length -= 1; // Account for the 'count' octet
- } else {
- length = ((unsigned int) dx_field_iterator_octet(iter)) << 24;
- length += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
- length += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
- length += (unsigned int) dx_field_iterator_octet(iter);
-
- count = ((unsigned int) dx_field_iterator_octet(iter)) << 24;
- count += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
- count += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
- count += (unsigned int) dx_field_iterator_octet(iter);
-
- length -= 4; // Account for the 'count' octets
- }
-
- //
- // The map is not valid if count is not an even number.
- //
- if (count & 1) {
- dx_log(log_module, LOG_TRACE, "dx_field_map - Invalid Map, odd number of fields: %d", count);
- return 0;
- }
-
- dx_field_map_t *map = new_dx_field_map_t();
- if (!map)
- return 0;
-
- map->outer = iter;
- map->key_count = count >> 1;
- DEQ_INIT(map->pairs);
-
- unsigned int idx;
- for (idx = 0; idx < map->key_count; idx++) {
- dx_field_iterator_t *key = dx_field_parse_amqp_value(iter, &length);
- dx_field_iterator_t *value = dx_field_parse_amqp_value(iter, &length);
-
- if (key == 0 || value == 0) {
- dx_field_map_free(map);
- return 0;
- }
-
- if (string_keys_only && !dx_tag_is_string(key->tag)) {
- dx_log(log_module, LOG_TRACE, "dx_field_map - Invalid Map, key tag is not a string: %02x", key->tag);
- dx_field_map_free(map);
- return 0;
- }
-
- dx_field_pair_t *pair = new_dx_field_pair_t();
- if (!pair) {
- dx_field_map_free(map);
- return 0;
- }
-
- DEQ_ITEM_INIT(pair);
- pair->key_iter = key;
- pair->value_iter = value;
- DEQ_INSERT_TAIL(map->pairs, pair);
- }
-
- return map;
-}
-
-
-void dx_field_map_free(dx_field_map_t *map)
-{
- if (!map)
- return;
-
- dx_field_pair_t *pair = DEQ_HEAD(map->pairs);
- while (pair) {
- DEQ_REMOVE_HEAD(map->pairs);
- free_dx_field_iterator_t(pair->key_iter);
- free_dx_field_iterator_t(pair->value_iter);
- free_dx_field_pair_t(pair);
- pair = DEQ_HEAD(map->pairs);
- }
-
- free_dx_field_map_t(map);
-}
-
-
-dx_field_iterator_t *dx_field_map_by_key(dx_field_map_t *map, const char *key)
-{
- dx_field_iterator_t *key_string;
- dx_field_iterator_t *value = 0;
- dx_field_pair_t *pair = DEQ_HEAD(map->pairs);
-
- while (pair && !value) {
- key_string = dx_field_raw(pair->key_iter);
- if (dx_field_iterator_equal(key_string, (const unsigned char*) key))
- value = pair->value_iter;
- free_dx_field_iterator_t(key_string);
- pair = DEQ_NEXT(pair);
- }
-
- return value;
-}
-
-
-static unsigned int dx_field_get_length(dx_field_iterator_t *iter, unsigned char tag) {
- unsigned long length = 0;
-
- switch (tag & 0xF0) {
- case 0x40: return 0;
- case 0x50: return 1;
- case 0x60: return 2;
- case 0x70: return 4;
- case 0x80: return 8;
- case 0x90: return 16;
- case 0xB0:
- case 0xD0:
- case 0xF0:
- length += ((unsigned int) dx_field_iterator_octet(iter)) << 24;
- length += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
- length += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
- // fall through to the next case
-
- case 0xA0:
- case 0xC0:
- case 0xE0:
- length += (unsigned int) dx_field_iterator_octet(iter);
- break;
-
- default:
- return 0;
- }
-
- return length;
-}
-
-
-dx_field_iterator_t *dx_field_raw(dx_field_iterator_t *iter)
-{
- dx_field_iterator_reset(iter);
- unsigned char tag = dx_field_iterator_octet(iter);
- unsigned int length = dx_field_get_length(iter, tag);
-
- dx_field_iterator_t *result = new_dx_field_iterator_t();
- if (!result)
- return 0;
- result->start_pointer = iter->pointer;
- result->start_pointer.length = length;
- result->view_start_pointer = result->start_pointer;
- result->pointer = result->start_pointer;
- result->view = ITER_VIEW_ALL;
- result->mode = MODE_TO_END;
- result->at_prefix = 0;
- result->view_prefix = 0;
- result->tag = 0;
-
- return result;
-}
-
-
dx_iovec_t *dx_field_iterator_iovec(const dx_field_iterator_t *iter)
{
assert(!iter->view_prefix); // Not supported for views with a prefix
diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c
index 8576bd5146..6d75de76db 100644
--- a/qpid/extras/dispatch/src/message.c
+++ b/qpid/extras/dispatch/src/message.c
@@ -18,6 +18,7 @@
*/
#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/amqp.h>
#include <qpid/dispatch/threading.h>
#include "message_private.h"
#include "compose_private.h"
diff --git a/qpid/extras/dispatch/src/parse.c b/qpid/extras/dispatch/src/parse.c
new file mode 100644
index 0000000000..5a9c26b7ca
--- /dev/null
+++ b/qpid/extras/dispatch/src/parse.c
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/parse.h>
+#include <qpid/dispatch/amqp.h>
+
+DEQ_DECLARE(dx_parsed_field_t, dx_parsed_field_list_t);
+
+struct dx_parsed_field_t {
+ DEQ_LINKS(dx_parsed_field_t);
+ dx_parsed_field_t *parent;
+ dx_parsed_field_list_t children;
+ uint8_t tag;
+ dx_field_iterator_t *raw_iter;
+ const char *parse_error;
+};
+
+ALLOC_DECLARE(dx_parsed_field_t);
+ALLOC_DEFINE(dx_parsed_field_t);
+
+
+static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count)
+{
+ if (dx_field_iterator_end(iter))
+ return "Insufficient Data to Determine Tag";
+ *tag = dx_field_iterator_octet(iter);
+ *count = 0;
+ *length = 0;
+
+ switch (*tag & 0xF0) {
+ case 0x40: *length = 0; break;
+ case 0x50: *length = 1; break;
+ case 0x60: *length = 2; break;
+ case 0x70: *length = 4; break;
+ case 0x80: *length = 8; break;
+ case 0x90: *length = 16; break;
+ case 0xB0:
+ case 0xD0:
+ case 0xF0:
+ *length += ((unsigned int) dx_field_iterator_octet(iter)) << 24;
+ *length += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
+ *length += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
+ // fall through to the next case
+
+ case 0xA0:
+ case 0xC0:
+ case 0xE0:
+ if (dx_field_iterator_end(iter))
+ return "Insufficient Data to Determine Length";
+ *length += (unsigned int) dx_field_iterator_octet(iter);
+ break;
+
+ default:
+ return "Invalid Tag - No Length Information";
+ }
+
+ switch (*tag & 0xF0) {
+ case 0xD0:
+ case 0xF0:
+ *count += ((unsigned int) dx_field_iterator_octet(iter)) << 24;
+ *count += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
+ *count += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
+ // fall through to the next case
+
+ case 0xC0:
+ case 0xE0:
+ if (dx_field_iterator_end(iter))
+ return "Insufficient Data to Determine Count";
+ *count += (unsigned int) dx_field_iterator_octet(iter);
+ break;
+ }
+
+ if ((*tag == DX_AMQP_MAP8 || *tag == DX_AMQP_MAP32) && (*count & 1))
+ return "Odd Number of Elements in a Map";
+
+ return 0;
+}
+
+
+static dx_parsed_field_t *dx_parse_internal(dx_field_iterator_t *iter, dx_parsed_field_t *p)
+{
+ dx_parsed_field_t *field = new_dx_parsed_field_t();
+ if (!field)
+ return 0;
+
+ DEQ_ITEM_INIT(field);
+ DEQ_INIT(field->children);
+ field->parent = p;
+ field->raw_iter = 0;
+
+ uint32_t length;
+ uint32_t count;
+
+ field->parse_error = get_type_info(iter, &field->tag, &length, &count);
+
+ if (!field->parse_error) {
+ field->raw_iter = dx_field_iterator_sub(iter, length);
+ if (count == 0 && length > 0)
+ dx_field_iterator_advance(iter, length);
+ for (uint32_t idx = 0; idx < count; idx++) {
+ dx_parsed_field_t *child = dx_parse_internal(field->raw_iter, field);
+ DEQ_INSERT_TAIL(field->children, child);
+ if (!dx_parse_ok(child)) {
+ field->parse_error = child->parse_error;
+ break;
+ }
+ }
+ }
+
+ return field;
+}
+
+
+dx_parsed_field_t *dx_parse(dx_field_iterator_t *iter)
+{
+ return dx_parse_internal(iter, 0);
+}
+
+
+void dx_parse_free(dx_parsed_field_t *field)
+{
+ if (!field)
+ return;
+
+ assert(field->parent == 0);
+ if (field->raw_iter)
+ dx_field_iterator_free(field->raw_iter);
+
+ dx_parsed_field_t *sub_field = DEQ_HEAD(field->children);
+ while (sub_field) {
+ dx_parsed_field_t *next = DEQ_NEXT(sub_field);
+ DEQ_REMOVE_HEAD(field->children);
+ sub_field->parent = 0;
+ dx_parse_free(sub_field);
+ sub_field = next;
+ }
+
+ free_dx_parsed_field_t(field);
+}
+
+
+int dx_parse_ok(dx_parsed_field_t *field)
+{
+ return field->parse_error == 0;
+}
+
+
+const char *dx_parse_error(dx_parsed_field_t *field)
+{
+ return field->parse_error;
+}
+
+
+uint8_t dx_parse_tag(dx_parsed_field_t *field)
+{
+ return field->tag;
+}
+
+
+dx_field_iterator_t *dx_parse_raw(dx_parsed_field_t *field)
+{
+ return field->raw_iter;
+}
+
+
+uint32_t dx_parse_as_uint(dx_parsed_field_t *field)
+{
+ uint32_t result = 0;
+
+ dx_field_iterator_reset(field->raw_iter);
+
+ switch (field->tag) {
+ case DX_AMQP_UINT:
+ result |= ((uint32_t) dx_field_iterator_octet(field->raw_iter)) << 24;
+
+ case DX_AMQP_USHORT:
+ result |= ((uint32_t) dx_field_iterator_octet(field->raw_iter)) << 16;
+ result |= ((uint32_t) dx_field_iterator_octet(field->raw_iter)) << 8;
+ // Fall Through...
+
+ case DX_AMQP_UBYTE:
+ case DX_AMQP_SMALLUINT:
+ case DX_AMQP_BOOLEAN:
+ result |= (uint32_t) dx_field_iterator_octet(field->raw_iter);
+ break;
+
+ case DX_AMQP_TRUE:
+ result = 1;
+ break;
+ }
+
+ return result;
+}
+
+
+uint64_t dx_parse_as_ulong(dx_parsed_field_t *field)
+{
+ uint64_t result = 0;
+
+ dx_field_iterator_reset(field->raw_iter);
+
+ switch (field->tag) {
+ case DX_AMQP_ULONG:
+ result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 56;
+ result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 48;
+ result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 40;
+ result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 32;
+ result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 24;
+ result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 16;
+ result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 8;
+ // Fall Through...
+
+ case DX_AMQP_SMALLULONG:
+ result |= (uint64_t) dx_field_iterator_octet(field->raw_iter);
+ // Fall Through...
+
+ case DX_AMQP_ULONG0:
+ break;
+ }
+
+ return result;
+}
+
+
+int32_t dx_parse_as_int(dx_parsed_field_t *field)
+{
+ int32_t result = 0;
+
+ dx_field_iterator_reset(field->raw_iter);
+
+ switch (field->tag) {
+ case DX_AMQP_INT:
+ result |= ((int32_t) dx_field_iterator_octet(field->raw_iter)) << 24;
+
+ case DX_AMQP_SHORT:
+ result |= ((int32_t) dx_field_iterator_octet(field->raw_iter)) << 16;
+ result |= ((int32_t) dx_field_iterator_octet(field->raw_iter)) << 8;
+ // Fall Through...
+
+ case DX_AMQP_BYTE:
+ case DX_AMQP_SMALLINT:
+ case DX_AMQP_BOOLEAN:
+ result |= (int32_t) dx_field_iterator_octet(field->raw_iter);
+ break;
+
+ case DX_AMQP_TRUE:
+ result = 1;
+ break;
+ }
+
+ return result;
+}
+
+
+int64_t dx_parse_as_long(dx_parsed_field_t *field)
+{
+ int64_t result = 0;
+
+ dx_field_iterator_reset(field->raw_iter);
+
+ switch (field->tag) {
+ case DX_AMQP_LONG:
+ result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 56;
+ result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 48;
+ result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 40;
+ result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 32;
+ result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 24;
+ result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 16;
+ result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 8;
+ // Fall Through...
+
+ case DX_AMQP_SMALLLONG:
+ result |= (uint64_t) dx_field_iterator_octet(field->raw_iter);
+ break;
+ }
+
+ return result;
+}
+
+
+uint32_t dx_parse_sub_count(dx_parsed_field_t *field)
+{
+ uint32_t count = DEQ_SIZE(field->children);
+
+ if (field->tag == DX_AMQP_MAP8 || field->tag == DX_AMQP_MAP32)
+ count = count >> 1;
+
+ return count;
+}
+
+
+dx_parsed_field_t *dx_parse_sub_key(dx_parsed_field_t *field, uint32_t idx)
+{
+ if (field->tag != DX_AMQP_MAP8 && field->tag != DX_AMQP_MAP32)
+ return 0;
+
+ idx = idx << 1;
+ dx_parsed_field_t *key = DEQ_HEAD(field->children);
+ while (idx && key) {
+ idx--;
+ key = DEQ_NEXT(key);
+ }
+
+ return key;
+}
+
+
+dx_parsed_field_t *dx_parse_sub_value(dx_parsed_field_t *field, uint32_t idx)
+{
+ if (field->tag == DX_AMQP_MAP8 || field->tag == DX_AMQP_MAP32)
+ idx = (idx << 1) + 1;
+
+ dx_parsed_field_t *key = DEQ_HEAD(field->children);
+ while (idx && key) {
+ idx--;
+ key = DEQ_NEXT(key);
+ }
+
+ return key;
+}
+
+
+int dx_parse_is_map(dx_parsed_field_t *field)
+{
+ return field->tag == DX_AMQP_MAP8 || field->tag == DX_AMQP_MAP32;
+}
+
+
+int dx_parse_is_list(dx_parsed_field_t *field)
+{
+ return field->tag == DX_AMQP_LIST8 || field->tag == DX_AMQP_LIST32;
+}
+
+
+int dx_parse_is_scalar(dx_parsed_field_t *field)
+{
+ return DEQ_SIZE(field->children) == 0;
+}
+
+
+dx_parsed_field_t *dx_parse_value_by_key(dx_parsed_field_t *field, const char *key)
+{
+ uint32_t count = dx_parse_sub_count(field);
+
+ for (uint32_t idx = 0; idx < count; idx++) {
+ dx_parsed_field_t *sub = dx_parse_sub_key(field, idx);
+ if (!sub)
+ return 0;
+
+ dx_field_iterator_t *iter = dx_parse_raw(sub);
+ if (!iter)
+ return 0;
+
+ if (dx_field_iterator_equal(iter, (const unsigned char*) key)) {
+ return dx_parse_sub_value(field, idx);
+ }
+ }
+
+ return 0;
+}
+