diff options
-rw-r--r-- | librabbitmq/amqp.h | 115 | ||||
-rw-r--r-- | librabbitmq/amqp_api.c | 242 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 26 | ||||
-rw-r--r-- | librabbitmq/codegen.py | 111 |
4 files changed, 164 insertions, 330 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 1541583..996372a 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -319,6 +319,28 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_method_number_t *expected_reply_ids, void *decoded_request_method); +RABBITMQ_EXPORT void *amqp_simple_rpc_decoded(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method); + +/* + * The API methods corresponding to most synchronous AMQP methods + * return a pointer to the decoded method result. Upon error, they + * return NULL, and we need some way of discovering what, if anything, + * went wrong. amqp_get_rpc_reply() returns the most recent + * amqp_rpc_reply_t instance corresponding to such an API operation + * for the given connection. + * + * Only use it for operations that do not themselves return + * amqp_rpc_reply_t; operations that do return amqp_rpc_reply_t + * generally do NOT update this per-connection-global amqp_rpc_reply_t + * instance. + */ +RABBITMQ_EXPORT amqp_rpc_reply_t amqp_get_rpc_reply( + amqp_connection_state_t state); + RABBITMQ_EXPORT amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost, int channel_max, @@ -326,10 +348,6 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, int heartbeat, amqp_sasl_method_enum sasl_method, ...); -RABBITMQ_EXPORT struct amqp_channel_open_ok_t_ *amqp_channel_open( - amqp_connection_state_t state, - amqp_channel_t channel); - struct amqp_basic_properties_t_; RABBITMQ_EXPORT int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, @@ -348,58 +366,6 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_connection_close( amqp_connection_state_t state, int code); -RABBITMQ_EXPORT struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t type, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_declare_ok_t_ *amqp_queue_declare( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_boolean_t exclusive, - amqp_boolean_t auto_delete, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_delete_ok_t_ *amqp_queue_delete( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t if_unused, - amqp_boolean_t if_empty); - -RABBITMQ_EXPORT struct amqp_queue_bind_ok_t_ *amqp_queue_bind( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_basic_consume_ok_t_ *amqp_basic_consume( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t consumer_tag, - amqp_boolean_t no_local, - amqp_boolean_t no_ack, - amqp_boolean_t exclusive, - amqp_table_t filter); - RABBITMQ_EXPORT int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, @@ -410,23 +376,10 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, amqp_bytes_t queue, amqp_boolean_t no_ack); -RABBITMQ_EXPORT struct amqp_queue_purge_ok_t_ *amqp_queue_purge( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_wait); - -RABBITMQ_EXPORT struct amqp_tx_select_ok_t_ *amqp_tx_select( - amqp_connection_state_t state, - amqp_channel_t channel); - -RABBITMQ_EXPORT struct amqp_tx_commit_ok_t_ *amqp_tx_commit( - amqp_connection_state_t state, - amqp_channel_t channel); - -RABBITMQ_EXPORT struct amqp_tx_rollback_ok_t_ *amqp_tx_rollback( - amqp_connection_state_t state, - amqp_channel_t channel); +RABBITMQ_EXPORT int amqp_basic_reject(amqp_connection_state_t state, + amqp_channel_t channel, + uint64_t delivery_tag, + amqp_boolean_t requeue); /* * Can be used to see if there is data still in the buffer, if so @@ -439,22 +392,6 @@ RABBITMQ_EXPORT amqp_boolean_t amqp_data_in_buffer( amqp_connection_state_t state); /* - * For those API operations (such as amqp_basic_ack, - * amqp_queue_declare, and so on) that do not themselves return - * amqp_rpc_reply_t instances, we need some way of discovering what, - * if anything, went wrong. amqp_get_rpc_reply() returns the most - * recent amqp_rpc_reply_t instance corresponding to such an API - * operation for the given connection. - * - * Only use it for operations that do not themselves return - * amqp_rpc_reply_t; operations that do return amqp_rpc_reply_t - * generally do NOT update this per-connection-global amqp_rpc_reply_t - * instance. - */ -RABBITMQ_EXPORT amqp_rpc_reply_t amqp_get_rpc_reply( - amqp_connection_state_t state); - -/* * Get the error string for the given error code. * * The returned string resides on the heap; the caller is responsible diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index bf19761..37f6605 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -119,23 +119,6 @@ const amqp_array_t amqp_empty_array = { 0, NULL }; ? (replytype *) state->most_recent_api_result.reply.decoded \ : NULL) -amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_CHANNEL_OPEN_OK_METHOD, 0}; - amqp_channel_open_t req; - req.out_of_band.bytes = NULL; - req.out_of_band.len = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_CHANNEL_OPEN_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, @@ -238,162 +221,6 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, replies, &req); } -amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t type, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_EXCHANGE_DECLARE_OK_METHOD, 0}; - amqp_exchange_declare_t req; - req.exchange = exchange; - req.type = type; - req.passive = passive; - req.durable = durable; - req.auto_delete = 0; - req.internal = 0; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_EXCHANGE_DECLARE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_boolean_t exclusive, - amqp_boolean_t auto_delete, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_DECLARE_OK_METHOD, 0}; - amqp_queue_declare_t req; - req.queue = queue; - req.passive = passive; - req.durable = durable; - req.exclusive = exclusive; - req.auto_delete = auto_delete; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_DECLARE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t if_unused, - amqp_boolean_t if_empty) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_DELETE_OK_METHOD, 0}; - amqp_queue_delete_t req; - req.queue = queue; - req.if_unused = if_unused; - req.if_empty = if_empty; - req.nowait = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_DELETE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_BIND_OK_METHOD, 0}; - amqp_queue_bind_t req; - req.ticket = 0; - req.queue = queue; - req.exchange = exchange; - req.routing_key = routing_key; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_BIND_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_UNBIND_OK_METHOD, 0}; - amqp_queue_unbind_t req; - req.ticket = 0; - req.queue = queue; - req.exchange = exchange; - req.routing_key = routing_key; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_UNBIND_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t consumer_tag, - amqp_boolean_t no_local, - amqp_boolean_t no_ack, - amqp_boolean_t exclusive, - amqp_table_t filter) -{ - amqp_method_number_t replies[2] = { AMQP_BASIC_CONSUME_OK_METHOD, 0}; - amqp_basic_consume_t req; - req.ticket = 0; - req.queue = queue; - req.consumer_tag = consumer_tag; - req.no_local = no_local; - req.no_ack = no_ack; - req.exclusive = exclusive; - req.nowait = 0; - req.arguments = filter; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_BASIC_CONSUME_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, @@ -405,26 +232,6 @@ int amqp_basic_ack(amqp_connection_state_t state, return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m); } -amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_wait) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_PURGE_OK_METHOD, 0}; - amqp_queue_purge_t req; - req.ticket = 0; - req.queue = queue; - req.nowait = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_PURGE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, @@ -444,46 +251,13 @@ amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, return state->most_recent_api_result; } -amqp_tx_select_ok_t *amqp_tx_select(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_TX_SELECT_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_SELECT_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_tx_commit_ok_t *amqp_tx_commit(amqp_connection_state_t state, - amqp_channel_t channel) +int amqp_basic_reject(amqp_connection_state_t state, + amqp_channel_t channel, + uint64_t delivery_tag, + amqp_boolean_t requeue) { - amqp_method_number_t replies[2] = { AMQP_TX_COMMIT_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_COMMIT_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_tx_rollback_ok_t *amqp_tx_rollback(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_TX_ROLLBACK_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_ROLLBACK_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) -{ - return state->most_recent_api_result; + amqp_basic_reject_t req; + req.delivery_tag = delivery_tag; + req.requeue = requeue; + return amqp_send_method(state, channel, AMQP_BASIC_REJECT_METHOD, &req); } diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index f23b42b..f9666b4 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -351,6 +351,32 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, } } +void *amqp_simple_rpc_decoded(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method) +{ + amqp_method_number_t replies[2]; + + replies[0] = reply_id; + replies[1] = 0; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + request_id, replies, + decoded_request_method); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; +} + +amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) +{ + return state->most_recent_api_result; +} + + static int amqp_login_inner(amqp_connection_state_t state, int channel_max, int frame_max, diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py index 6fd149e..b52cf3b 100644 --- a/librabbitmq/codegen.py +++ b/librabbitmq/codegen.py @@ -112,7 +112,7 @@ class BitEncoder(object): self.emitter.emit(line) def encode_bit(self, value): - """Generate code to ebcode a value of the AMQP bit type from + """Generate code to encode a value of the AMQP bit type from the given value.""" if self.bit == 0: self.emitter.emit("bit_buffer = 0;") @@ -138,6 +138,8 @@ class SimpleType(object): def encode(self, emitter, value): emitter.emit("if (!amqp_encode_%d(encoded, &offset, %s)) return -ERROR_BAD_AMQP_DATA;" % (self.bits, value)) + def literal(self, value): + return value class StrType(object): """The AMQP shortstr or longstr types.""" @@ -159,6 +161,11 @@ class StrType(object): emitter.emit(" || !amqp_encode_bytes(encoded, &offset, %s))" % (value,)) emitter.emit(" return -ERROR_BAD_AMQP_DATA;") + def literal(self, value): + if value != '': + raise NotImplementedError() + + return "amqp_empty_bytes" class BitType(object): """The AMQP bit type.""" @@ -172,6 +179,8 @@ class BitType(object): def encode(self, emitter, value): emitter.encode_bit(value) + def literal(self, value): + return {True: 1, False: 0}[value] class TableType(object): """The AMQP table type.""" @@ -191,6 +200,8 @@ class TableType(object): emitter.emit(" if (res < 0) return res;") emitter.emit("}") + def literal(self, value): + raise NotImplementedError() types = { 'octet': SimpleType(8), @@ -213,11 +224,54 @@ def c_ize(s): s = s.replace(' ', '_') return s +# When generating API functions corresponding to synchronous methods, +# we need some information that isn't in the protocol def: Some +# methods should not be exposed, indicated here by a False value. +# Some methods should be exposed but certain fields should not be +# exposed as parameters. +apiMethodInfo = { + "amqp_connection_start": False, # application code should not use this + "amqp_connection_secure": False, # application code should not use this + "amqp_connection_tune": False, # application code should not use this + "amqp_connection_open": False, # application code should not use this + "amqp_connection_close": False, # needs special handling + "amqp_channel_open": ["out_of_band"], + "amqp_channel_close": False, # needs special handling + "amqp_access_request": False, # huh? + "amqp_exchange_declare": ["auto_delete", "internal"], + "amqp_basic_get": False, # get-ok has content +} + +# When generating API functions corresponding to synchronous methods, +# some fields should be suppressed everywhere. This dict names those +# fields, and the fixed values to use for them. +apiMethodsSuppressArgs = {"ticket": 0, "nowait": False} + AmqpMethod.defName = lambda m: cConstantName(c_ize(m.klass.name) + '_' + c_ize(m.name) + "_method") -AmqpMethod.structName = lambda m: "amqp_" + c_ize(m.klass.name) + '_' + c_ize(m.name) + "_t" +AmqpMethod.fullName = lambda m: "amqp_%s_%s" % (c_ize(m.klass.name), c_ize(m.name)) +AmqpMethod.structName = lambda m: m.fullName() + "_t" AmqpClass.structName = lambda c: "amqp_" + c_ize(c.name) + "_properties_t" +def methodApiPrototype(m): + fn = m.fullName() + info = apiMethodInfo.get(fn, []) + + args = [] + for f in m.arguments: + n = c_ize(f.name) + if n in apiMethodsSuppressArgs or n in info: + continue + + args.append(", ") + args.append(typeFor(m.klass.spec, f).ctype) + args.append(" ") + args.append(n) + + return "%s_ok_t *%s(amqp_connection_state_t state, amqp_channel_t channel%s)" % (fn, fn, ''.join(args)) + +AmqpMethod.apiPrototype = methodApiPrototype + def cConstantName(s): return 'AMQP_' + '_'.join(re.split('[- ]', s.upper())) @@ -424,13 +478,48 @@ int amqp_encode_properties(uint16_t class_id, remaining_flags = remainder; } while (remaining_flags != 0); } - + switch (class_id) {""" for c in spec.allClasses(): genEncodeProperties(c) print """ default: return -ERROR_UNKNOWN_CLASS; } }""" + for m in methods: + if not m.isSynchronous: + continue + + info = apiMethodInfo.get(m.fullName(), []) + if info is False: + continue + + print + print m.apiPrototype() + print "{" + print " %s req;" % (m.structName(),) + + for f in m.arguments: + n = c_ize(f.name) + + val = apiMethodsSuppressArgs.get(n) + if val is None and n in info: + val = f.defaultvalue + + if val is None: + val = n + else: + val = typeFor(spec, f).literal(val) + + + print " req.%s = %s;" % (n, val) + + reply = cConstantName(c_ize(m.klass.name) + '_' + c_ize(m.name) + + "_ok_method") + print """ + return amqp_simple_rpc_decoded(state, channel, %s, %s, &req); +} +""" % (m.defName(), reply) + def genHrl(spec): def fieldDeclList(fields): if fields: @@ -439,7 +528,7 @@ def genHrl(spec): for f in fields]) else: return " char dummy; /* Dummy field to avoid empty struct */\n" - + def propDeclList(fields): return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, c_ize(f.name)) for f in fields @@ -465,6 +554,7 @@ extern "C" { print print """/* Function prototypes. */ + extern char const *amqp_constant_name(int constantNumber); extern amqp_boolean_t amqp_constant_is_hard_error(int constantNumber); RABBITMQ_EXPORT char const *amqp_method_name(amqp_method_number_t methodNumber); @@ -485,7 +575,7 @@ extern int amqp_encode_properties(uint16_t class_id, amqp_bytes_t encoded); """ - print "/* Method field records. */" + print "/* Method field records. */\n" for m in methods: methodid = m.klass.index << 16 | m.index print "#define %s ((amqp_method_number_t) 0x%.08X) /* %d, %d; %d */" % \ @@ -515,7 +605,14 @@ extern int amqp_encode_properties(uint16_t class_id, fieldDeclList(c.fields), c.structName()) - print """#ifdef __cplusplus + print "/* API functions for methods */\n" + + for m in methods: + if m.isSynchronous and apiMethodInfo.get(m.fullName()) is not False: + print "RABBITMQ_EXPORT %s;" % (m.apiPrototype(),) + + print """ +#ifdef __cplusplus } #endif @@ -526,6 +623,6 @@ def generateErl(specPath): def generateHrl(specPath): genHrl(AmqpSpec(specPath)) - + if __name__ == "__main__": do_main(generateHrl, generateErl) |