summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_private.h
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_private.h')
-rw-r--r--librabbitmq/amqp_private.h197
1 files changed, 119 insertions, 78 deletions
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index c30663a..eaf6b2c 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -51,9 +51,7 @@
* ***** END LICENSE BLOCK *****
*/
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include "config.h"
/* Error numbering: Because of differences in error numbering on
* different platforms, we want to keep error numbers opaque for
@@ -78,42 +76,43 @@ extern "C" {
extern char *amqp_os_error_string(int err);
+#include "socket.h"
+
/*
- * Connection states:
+ * Connection states: XXX FIX THIS
*
- * - CONNECTION_STATE_IDLE: initial state, and entered again after
- * each frame is completed. Means that no bytes of the next frame
- * have been seen yet. Connections may only be reconfigured, and the
+ * - CONNECTION_STATE_INITIAL: The initial state, when we cannot be
+ * sure if the next thing we will get is the first AMQP frame, or a
+ * protocol header from the server.
+ *
+ * - CONNECTION_STATE_IDLE: The normal state between
+ * frames. Connections may only be reconfigured, and the
* connection's pools recycled, when in this state. Whenever we're
* in this state, the inbound_buffer's bytes pointer must be NULL;
* any other state, and it must point to a block of memory allocated
* from the frame_pool.
*
- * - CONNECTION_STATE_WAITING_FOR_HEADER: Some bytes of an incoming
- * frame have been seen, but not a complete frame header's worth.
- *
- * - CONNECTION_STATE_WAITING_FOR_BODY: A complete frame header has
- * been seen, but the frame is not yet complete. When it is
- * completed, it will be returned, and the connection will return to
- * IDLE state.
+ * - CONNECTION_STATE_HEADER: Some bytes of an incoming frame have
+ * been seen, but not a complete frame header's worth.
*
- * - CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: The beginning of a
- * protocol version header has been seen, but the full eight bytes
- * hasn't yet been received. When it is completed, it will be
+ * - CONNECTION_STATE_BODY: A complete frame header has been seen, but
+ * the frame is not yet complete. When it is completed, it will be
* returned, and the connection will return to IDLE state.
*
*/
typedef enum amqp_connection_state_enum_ {
CONNECTION_STATE_IDLE = 0,
- CONNECTION_STATE_WAITING_FOR_HEADER,
- CONNECTION_STATE_WAITING_FOR_BODY,
- CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER
+ CONNECTION_STATE_INITIAL,
+ CONNECTION_STATE_HEADER,
+ CONNECTION_STATE_BODY
} amqp_connection_state_enum;
/* 7 bytes up front, then payload, then 1 byte footer */
#define HEADER_SIZE 7
#define FOOTER_SIZE 1
+#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER 'A'
+
typedef struct amqp_link_t_ {
struct amqp_link_t_ *next;
void *data;
@@ -146,65 +145,107 @@ struct amqp_connection_state_t_ {
amqp_rpc_reply_t most_recent_api_result;
};
-#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -ERROR_BAD_AMQP_DATA; } (v); })
-#define BUF_AT(b, o) (&(((uint8_t *) (b).bytes)[o]))
-
-#define D_8(b, o) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o))
-#define D_16(b, o) CHECK_LIMIT(b, o, 2, ({uint16_t v; memcpy(&v, BUF_AT(b, o), 2); ntohs(v);}))
-#define D_32(b, o) CHECK_LIMIT(b, o, 4, ({uint32_t v; memcpy(&v, BUF_AT(b, o), 4); ntohl(v);}))
-#define D_64(b, o) ({ \
- uint64_t hi = D_32(b, o); \
- uint64_t lo = D_32(b, o + 4); \
- hi << 32 | lo; \
-})
-
-#define D_BYTES(b, o, l) CHECK_LIMIT(b, o, l, BUF_AT(b, o))
-
-#define E_8(b, o, v) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o) = (v))
-#define E_16(b, o, v) CHECK_LIMIT(b, o, 2, ({uint16_t vv = htons(v); memcpy(BUF_AT(b, o), &vv, 2);}))
-#define E_32(b, o, v) CHECK_LIMIT(b, o, 4, ({uint32_t vv = htonl(v); memcpy(BUF_AT(b, o), &vv, 4);}))
-#define E_64(b, o, v) ({ \
- E_32(b, o, (uint32_t) (((uint64_t) v) >> 32)); \
- E_32(b, o + 4, (uint32_t) (((uint64_t) v) & 0xFFFFFFFF)); \
- })
-
-#define E_BYTES(b, o, l, v) CHECK_LIMIT(b, o, l, memcpy(BUF_AT(b, o), (v), (l)))
-
-extern int amqp_decode_table(amqp_bytes_t encoded,
- amqp_pool_t *pool,
- amqp_table_t *output,
- int *offsetptr);
-
-extern int amqp_encode_table(amqp_bytes_t encoded,
- amqp_table_t *input,
- int *offsetptr);
-
-#define amqp_assert(condition, ...) \
- ({ \
- if (!(condition)) { \
- fprintf(stderr, __VA_ARGS__); \
- fputc('\n', stderr); \
- abort(); \
- } \
- })
-
-#define AMQP_CHECK_RESULT_CLEANUP(expr, stmts) \
- ({ \
- int _result = (expr); \
- if (_result < 0) { stmts; return _result; } \
- _result; \
- })
-
-#define AMQP_CHECK_RESULT(expr) AMQP_CHECK_RESULT_CLEANUP(expr, )
-
-#ifndef NDEBUG
-extern void amqp_dump(void const *buffer, size_t len);
-#else
-#define amqp_dump(buffer, len) ((void) 0)
-#endif
+static inline void *amqp_offset(void *data, size_t offset)
+{
+ return (char *)data + offset;
+}
-#ifdef __cplusplus
+/* assuming a machine that supports unaligned accesses (for now) */
+
+#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \
+ \
+static inline void amqp_e##bits(void *data, size_t offset, \
+ uint##bits##_t val) \
+{ \
+ *(uint##bits##_t *)amqp_offset(data, offset) = htonx(val); \
+} \
+ \
+static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \
+{ \
+ return ntohx(*(uint##bits##_t *)amqp_offset(data, offset)); \
+} \
+ \
+static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \
+ uint##bits##_t input) \
+ \
+{ \
+ size_t o = *offset; \
+ if ((*offset = o + bits / 8) <= encoded.len) { \
+ amqp_e##bits(encoded.bytes, o, input); \
+ return 1; \
+ } \
+ else { \
+ return 0; \
+ } \
+} \
+ \
+static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
+ uint##bits##_t *output) \
+ \
+{ \
+ size_t o = *offset; \
+ if ((*offset = o + bits / 8) <= encoded.len) { \
+ *output = amqp_d##bits(encoded.bytes, o); \
+ *output = ntohx(*(uint##bits##_t *)((char *)encoded.bytes + o)); \
+ return 1; \
+ } \
+ else { \
+ return 0; \
+ } \
}
-#endif
+
+/* assuming little endian (for now) */
+
+#define DECLARE_XTOXLL(func) \
+static inline uint64_t func##ll(uint64_t val) \
+{ \
+ union { \
+ uint64_t whole; \
+ uint32_t halves[2]; \
+ } u; \
+ uint32_t t; \
+ u.whole = val; \
+ t = u.halves[0]; \
+ u.halves[0] = func##l(u.halves[1]); \
+ u.halves[1] = func##l(t); \
+ return u.whole; \
+}
+
+DECLARE_XTOXLL(hton)
+DECLARE_XTOXLL(ntoh)
+
+DECLARE_CODEC_BASE_TYPE(8, (uint8_t), (uint8_t))
+DECLARE_CODEC_BASE_TYPE(16, htons, ntohs)
+DECLARE_CODEC_BASE_TYPE(32, htonl, ntohl)
+DECLARE_CODEC_BASE_TYPE(64, htonll, ntohll)
+
+static inline int amqp_encode_bytes(amqp_bytes_t encoded, size_t *offset,
+ amqp_bytes_t input)
+{
+ size_t o = *offset;
+ if ((*offset = o + input.len) <= encoded.len) {
+ memcpy(amqp_offset(encoded.bytes, o), input.bytes, input.len);
+ return 1;
+ }
+ else {
+ return 0;
+ }
+}
+
+static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset,
+ amqp_bytes_t *output, size_t len)
+{
+ size_t o = *offset;
+ if ((*offset = o + len) <= encoded.len) {
+ output->bytes = amqp_offset(encoded.bytes, o);
+ output->len = len;
+ return 1;
+ }
+ else {
+ return 0;
+ }
+}
+
+extern void amqp_abort(const char *fmt, ...);
#endif