summaryrefslogtreecommitdiff
path: root/extras/dispatch/src/container.c
diff options
context:
space:
mode:
Diffstat (limited to 'extras/dispatch/src/container.c')
-rw-r--r--extras/dispatch/src/container.c616
1 files changed, 616 insertions, 0 deletions
diff --git a/extras/dispatch/src/container.c b/extras/dispatch/src/container.c
new file mode 100644
index 0000000000..68e2afa3eb
--- /dev/null
+++ b/extras/dispatch/src/container.c
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/message.h>
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/hash.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/log.h>
+
+static char *module="CONTAINER";
+
+struct dx_node_t {
+ const dx_node_type_t *ntype;
+ char *name;
+ void *context;
+ dx_dist_mode_t supported_dist;
+ dx_lifetime_policy_t life_policy;
+};
+
+ALLOC_DECLARE(dx_node_t);
+ALLOC_DEFINE(dx_node_t);
+ALLOC_DEFINE(dx_link_item_t);
+
+struct dx_link_t {
+ pn_link_t *pn_link;
+ void *context;
+ dx_node_t *node;
+};
+
+ALLOC_DECLARE(dx_link_t);
+ALLOC_DEFINE(dx_link_t);
+
+typedef struct nxc_node_type_t {
+ DEQ_LINKS(struct nxc_node_type_t);
+ const dx_node_type_t *ntype;
+} nxc_node_type_t;
+DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t);
+
+
+static hash_t *node_type_map;
+static hash_t *node_map;
+static sys_mutex_t *lock;
+static dx_node_t *default_node;
+static nxc_node_type_list_t node_type_list;
+
+static void setup_outgoing_link(pn_link_t *pn_link)
+{
+ sys_mutex_lock(lock);
+ dx_node_t *node;
+ int result;
+ const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link));
+ dx_field_iterator_t *iter;
+ // TODO - Extract the name from the structured source
+
+ if (source) {
+ iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID);
+ result = hash_retrieve(node_map, iter, (void*) &node);
+ dx_field_iterator_free(iter);
+ } else
+ result = -1;
+ sys_mutex_unlock(lock);
+
+ if (result < 0) {
+ if (default_node)
+ node = default_node;
+ else {
+ // Reject the link
+ // TODO - When the API allows, add an error message for "no available node"
+ pn_link_close(pn_link);
+ return;
+ }
+ }
+
+ dx_link_t *link = new_dx_link_t();
+ if (!link) {
+ pn_link_close(pn_link);
+ return;
+ }
+
+ link->pn_link = pn_link;
+ link->context = 0;
+ link->node = node;
+
+ pn_link_set_context(pn_link, link);
+ node->ntype->outgoing_handler(node->context, link);
+}
+
+
+static void setup_incoming_link(pn_link_t *pn_link)
+{
+ sys_mutex_lock(lock);
+ dx_node_t *node;
+ int result;
+ const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ dx_field_iterator_t *iter;
+ // TODO - Extract the name from the structured target
+
+ if (target) {
+ iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID);
+ result = hash_retrieve(node_map, iter, (void*) &node);
+ dx_field_iterator_free(iter);
+ } else
+ result = -1;
+ sys_mutex_unlock(lock);
+
+ if (result < 0) {
+ if (default_node)
+ node = default_node;
+ else {
+ // Reject the link
+ // TODO - When the API allows, add an error message for "no available node"
+ pn_link_close(pn_link);
+ return;
+ }
+ }
+
+ dx_link_t *link = new_dx_link_t();
+ if (!link) {
+ pn_link_close(pn_link);
+ return;
+ }
+
+ link->pn_link = pn_link;
+ link->context = 0;
+ link->node = node;
+
+ pn_link_set_context(pn_link, link);
+ node->ntype->incoming_handler(node->context, link);
+}
+
+
+static int do_writable(pn_link_t *pn_link)
+{
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ if (!link)
+ return 0;
+
+ dx_node_t *node = link->node;
+ if (!node)
+ return 0;
+
+ return node->ntype->writable_handler(node->context, link);
+}
+
+
+static void process_receive(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ dx_node_t *node = link->node;
+ if (node) {
+ node->ntype->rx_handler(node->context, link, delivery);
+ return;
+ }
+ }
+
+ //
+ // Reject the delivery if we couldn't find a node to handle it
+ //
+ pn_link_advance(pn_link);
+ pn_link_flow(pn_link, 1);
+ pn_delivery_update(delivery, PN_REJECTED);
+ pn_delivery_settle(delivery);
+}
+
+
+static void do_send(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ dx_node_t *node = link->node;
+ if (node) {
+ node->ntype->tx_handler(node->context, link, delivery);
+ return;
+ }
+ }
+
+ // TODO - Cancel the delivery
+}
+
+
+static void do_updated(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ dx_node_t *node = link->node;
+ if (node)
+ node->ntype->disp_handler(node->context, link, delivery);
+ }
+}
+
+
+static int close_handler(void* unused, pn_connection_t *conn)
+{
+ //
+ // Close all links, passing False as the 'closed' argument. These links are not
+ // being properly 'detached'. They are being orphaned.
+ //
+ pn_link_t *pn_link = pn_link_head(conn, 0);
+ while (pn_link) {
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_node_t *node = link->node;
+ if (node)
+ node->ntype->link_detach_handler(node->context, link, 0);
+ pn_link_close(pn_link);
+ free_dx_link_t(link);
+ pn_link = pn_link_next(pn_link, 0);
+ }
+
+ // teardown all sessions
+ pn_session_t *ssn = pn_session_head(conn, 0);
+ while (ssn) {
+ pn_session_close(ssn);
+ ssn = pn_session_next(ssn, 0);
+ }
+
+ // teardown the connection
+ pn_connection_close(conn);
+ return 0;
+}
+
+
+static int process_handler(void* unused, pn_connection_t *conn)
+{
+ pn_session_t *ssn;
+ pn_link_t *pn_link;
+ pn_delivery_t *delivery;
+ int event_count = 0;
+
+ // Step 1: setup the engine's connection, and any sessions and links
+ // that may be pending.
+
+ // initialize the connection if it's new
+ if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
+ pn_connection_open(conn);
+ event_count++;
+ }
+
+ // open all pending sessions
+ ssn = pn_session_head(conn, PN_LOCAL_UNINIT);
+ while (ssn) {
+ pn_session_open(ssn);
+ ssn = pn_session_next(ssn, PN_LOCAL_UNINIT);
+ event_count++;
+ }
+
+ // configure and open any pending links
+ pn_link = pn_link_head(conn, PN_LOCAL_UNINIT);
+ while (pn_link) {
+ if (pn_link_is_sender(pn_link))
+ setup_outgoing_link(pn_link);
+ else
+ setup_incoming_link(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT);
+ event_count++;
+ }
+
+
+ // Step 2: Now drain all the pending deliveries from the connection's
+ // work queue and process them
+
+ delivery = pn_work_head(conn);
+ while (delivery) {
+ if (pn_delivery_readable(delivery))
+ process_receive(delivery);
+ else if (pn_delivery_writable(delivery))
+ do_send(delivery);
+
+ if (pn_delivery_updated(delivery))
+ do_updated(delivery);
+
+ delivery = pn_work_next(delivery);
+ event_count++;
+ }
+
+ //
+ // Step 2.5: Traverse all of the links on the connection looking for
+ // outgoing links with non-zero credit. Call the attached node's
+ // writable handler for such links.
+ //
+ pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ while (pn_link) {
+ assert(pn_session_connection(pn_link_session(pn_link)) == conn);
+ if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0)
+ event_count += do_writable(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ }
+
+ // Step 3: Clean up any links or sessions that have been closed by the
+ // remote. If the connection has been closed remotely, clean that up
+ // also.
+
+ // teardown any terminating links
+ pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ while (pn_link) {
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_node_t *node = link->node;
+ if (node)
+ node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message
+ pn_link_close(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ event_count++;
+ }
+
+ // teardown any terminating sessions
+ ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ while (ssn) {
+ pn_session_close(ssn);
+ ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ event_count++;
+ }
+
+ // teardown the connection if it's terminating
+ if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+ pn_connection_close(conn);
+ event_count++;
+ }
+
+ return event_count;
+}
+
+
+static void open_handler(dx_connection_t *conn, dx_direction_t dir)
+{
+ const dx_node_type_t *nt;
+
+ //
+ // Note the locking structure in this function. Generally this would be unsafe, but since
+ // this particular list is only ever appended to and never has items inserted or deleted,
+ // this usage is safe in this case.
+ //
+ sys_mutex_lock(lock);
+ nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list);
+ sys_mutex_unlock(lock);
+
+ pn_connection_open(dx_connection_pn(conn));
+
+ while (nt_item) {
+ nt = nt_item->ntype;
+ if (dir == DX_INCOMING) {
+ if (nt->inbound_conn_open_handler)
+ nt->inbound_conn_open_handler(nt->type_context, conn);
+ } else {
+ if (nt->outbound_conn_open_handler)
+ nt->outbound_conn_open_handler(nt->type_context, conn);
+ }
+
+ sys_mutex_lock(lock);
+ nt_item = DEQ_NEXT(nt_item);
+ sys_mutex_unlock(lock);
+ }
+}
+
+
+static int handler(void* context, dx_conn_event_t event, dx_connection_t *dx_conn)
+{
+ pn_connection_t *conn = dx_connection_pn(dx_conn);
+
+ switch (event) {
+ case DX_CONN_EVENT_LISTENER_OPEN: open_handler(dx_conn, DX_INCOMING); break;
+ case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(dx_conn, DX_OUTGOING); break;
+ case DX_CONN_EVENT_CLOSE: return close_handler(context, conn);
+ case DX_CONN_EVENT_PROCESS: return process_handler(context, conn);
+ }
+
+ return 0;
+}
+
+
+void dx_container_initialize(void)
+{
+ dx_log(module, LOG_TRACE, "Container Initializing");
+
+ node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4
+ node_map = hash(10, 32, 0); // 1K buckets, item batches of 32
+ lock = sys_mutex();
+ default_node = 0;
+ DEQ_INIT(node_type_list);
+
+ dx_server_set_conn_handler(handler);
+}
+
+
+void dx_container_finalize(void)
+{
+}
+
+
+int dx_container_register_node_type(const dx_node_type_t *nt)
+{
+ int result;
+ dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL);
+ nxc_node_type_t *nt_item = NEW(nxc_node_type_t);
+ DEQ_ITEM_INIT(nt_item);
+ nt_item->ntype = nt;
+
+ sys_mutex_lock(lock);
+ result = hash_insert_const(node_type_map, iter, nt);
+ DEQ_INSERT_TAIL(node_type_list, nt_item);
+ sys_mutex_unlock(lock);
+
+ dx_field_iterator_free(iter);
+ if (result < 0)
+ return result;
+ dx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name);
+
+ return 0;
+}
+
+
+void dx_container_set_default_node_type(const dx_node_type_t *nt,
+ void *context,
+ dx_dist_mode_t supported_dist)
+{
+ if (default_node)
+ dx_container_destroy_node(default_node);
+
+ if (nt) {
+ default_node = dx_container_create_node(nt, 0, context, supported_dist, DX_LIFE_PERMANENT);
+ dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name);
+ } else {
+ default_node = 0;
+ dx_log(module, LOG_TRACE, "Default node removed");
+ }
+}
+
+
+dx_node_t *dx_container_create_node(const dx_node_type_t *nt,
+ const char *name,
+ void *context,
+ dx_dist_mode_t supported_dist,
+ dx_lifetime_policy_t life_policy)
+{
+ int result;
+ dx_node_t *node = new_dx_node_t();
+ if (!node)
+ return 0;
+
+ node->ntype = nt;
+ node->name = 0;
+ node->context = context;
+ node->supported_dist = supported_dist;
+ node->life_policy = life_policy;
+
+ if (name) {
+ dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL);
+ sys_mutex_lock(lock);
+ result = hash_insert(node_map, iter, node);
+ sys_mutex_unlock(lock);
+ dx_field_iterator_free(iter);
+ if (result < 0) {
+ free_dx_node_t(node);
+ return 0;
+ }
+
+ node->name = (char*) malloc(strlen(name) + 1);
+ strcpy(node->name, name);
+ }
+
+ if (name)
+ dx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name);
+
+ return node;
+}
+
+
+void dx_container_destroy_node(dx_node_t *node)
+{
+ if (node->name) {
+ dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL);
+ sys_mutex_lock(lock);
+ hash_remove(node_map, iter);
+ sys_mutex_unlock(lock);
+ dx_field_iterator_free(iter);
+ free(node->name);
+ }
+
+ free_dx_node_t(node);
+}
+
+
+void dx_container_node_set_context(dx_node_t *node, void *node_context)
+{
+ node->context = node_context;
+}
+
+
+dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node)
+{
+ return node->supported_dist;
+}
+
+
+dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node)
+{
+ return node->life_policy;
+}
+
+
+dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char* name)
+{
+ pn_session_t *sess = pn_session(dx_connection_pn(conn));
+ dx_link_t *link = new_dx_link_t();
+
+ if (dir == DX_OUTGOING)
+ link->pn_link = pn_sender(sess, name);
+ else
+ link->pn_link = pn_receiver(sess, name);
+ link->context = node->context;
+ link->node = node;
+
+ pn_link_set_context(link->pn_link, link);
+
+ pn_session_open(sess);
+
+ return link;
+}
+
+
+void dx_link_set_context(dx_link_t *link, void *context)
+{
+ link->context = context;
+}
+
+
+void *dx_link_get_context(dx_link_t *link)
+{
+ return link->context;
+}
+
+
+pn_link_t *dx_link_pn(dx_link_t *link)
+{
+ return link->pn_link;
+}
+
+
+pn_terminus_t *dx_link_source(dx_link_t *link)
+{
+ return pn_link_source(link->pn_link);
+}
+
+
+pn_terminus_t *dx_link_target(dx_link_t *link)
+{
+ return pn_link_target(link->pn_link);
+}
+
+
+pn_terminus_t *dx_link_remote_source(dx_link_t *link)
+{
+ return pn_link_remote_source(link->pn_link);
+}
+
+
+pn_terminus_t *dx_link_remote_target(dx_link_t *link)
+{
+ return pn_link_remote_target(link->pn_link);
+}
+
+
+void dx_link_activate(dx_link_t *link)
+{
+ if (!link || !link->pn_link)
+ return;
+
+ pn_session_t *sess = pn_link_session(link->pn_link);
+ if (!sess)
+ return;
+
+ pn_connection_t *conn = pn_session_connection(sess);
+ if (!conn)
+ return;
+
+ dx_connection_t *ctx = pn_connection_get_context(conn);
+ if (!ctx)
+ return;
+
+ dx_server_activate(ctx);
+}
+
+
+void dx_link_close(dx_link_t *link)
+{
+ pn_link_close(link->pn_link);
+}
+
+