diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-26 14:23:16 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-07-08 15:13:32 -0700 |
commit | 33ebeede97a81c2e82ac0c3a6d88d4db0695bf29 (patch) | |
tree | 1151d9a060cb9afdf6ac25b66ddbdd03e46f8674 /librabbitmq/amqp_socket.c | |
parent | 08af83a97a99c08aaa3878724a07829e0bb955da (diff) | |
download | rabbitmq-c-github-ask-33ebeede97a81c2e82ac0c3a6d88d4db0695bf29.tar.gz |
Add a high level API for consuming messages
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index be85ff5..ef9debd 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -782,6 +782,80 @@ beginrecv: } } +int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) +{ + amqp_link_t *link; + amqp_frame_t *frame_copy; + + amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel); + + if (NULL == channel_pool) { + return AMQP_STATUS_NO_MEMORY; + } + + link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); + frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); + + if (NULL == link || NULL == frame_copy) { + return AMQP_STATUS_NO_MEMORY; + } + + *frame_copy = *frame; + link->data = frame_copy; + + if (NULL == state->first_queued_frame) { + state->first_queued_frame = link; + } else { + state->last_queued_frame->next = link; + } + + link->next = NULL; + state->last_queued_frame = link; + + return AMQP_STATUS_OK; +} + +int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_frame_t *decoded_frame) +{ + amqp_frame_t *frame_ptr; + amqp_link_t *cur; + int res; + + for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) { + frame_ptr = cur->data; + + if (channel == frame_ptr->channel) { + state->first_queued_frame = cur->next; + if (NULL == state->first_queued_frame) { + state->last_queued_frame = NULL; + } + + *decoded_frame = *frame_ptr; + + return AMQP_STATUS_OK; + } + } + + while (1) { + res = wait_frame_inner(state, decoded_frame, NULL); + + if (AMQP_STATUS_OK != res) { + return res; + } + + if (channel == decoded_frame->channel) { + return AMQP_STATUS_OK; + } else { + res = amqp_queue_frame(state, decoded_frame); + if (res != AMQP_STATUS_OK) { + return res; + } + } + } +} + int amqp_simple_wait_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { |