From f1c04298943f3fe480ba0240485013b03b6d7f67 Mon Sep 17 00:00:00 2001 From: Stefan Wildemann Date: Fri, 18 Oct 2019 12:01:14 +0200 Subject: enhancement:maptool:improve processing speed and fix some memory holes (#901) * Convert turn relation processing to multi threaded * Enhance item read function to block read the items for faster overall reading speed. * Fix some memory holes found by valgrind. There are still many left. Specially in coastline and Country border code. --- navit/maptool/itembin_buffer.c | 44 +++++++-- navit/maptool/maptool.c | 4 + navit/maptool/maptool.h | 1 + navit/maptool/osm.c | 203 ++++++++++++++++++++++++++++++++++++----- navit/maptool/osm_relations.c | 69 ++++++++++++++ 5 files changed, 292 insertions(+), 29 deletions(-) diff --git a/navit/maptool/itembin_buffer.c b/navit/maptool/itembin_buffer.c index a3d63cf17..5bdf09458 100644 --- a/navit/maptool/itembin_buffer.c +++ b/navit/maptool/itembin_buffer.c @@ -21,19 +21,51 @@ #include "maptool.h" #include "debug.h" - /** Buffer for temporarily storing an item. */ static char misc_item_buffer[20000000]; /** An item_bin for temporary use. */ struct item_bin *tmp_item_bin=(struct item_bin *)(void *)misc_item_buffer; -/** A node_item for temporary use. */ -static struct node_item *tmp_node_item=(struct node_item *)(void *)misc_item_buffer; struct node_item * read_node_item(FILE *in) { - if (fread(tmp_node_item, sizeof(struct node_item), 1, in) != 1) - return NULL; - return tmp_node_item; +#define ITEM_COUNT 1*1024*1024 + /* we read in bigger chunks as file IO in small chunks is slow as hell */ + static FILE * last_in = NULL; + static long last_pos = 0; + static int in_count; + static int out_count; + static struct node_item item_buffer[sizeof(struct node_item) * ITEM_COUNT]; + + struct node_item * retval = NULL; + + if((last_in != in) || (last_pos != ftell(in))) { + if((out_count - in_count) > 0) + fprintf(stderr, "change file. Still %d items\n", out_count - in_count); + /* got new file. flush buffer. */ + in_count=0; + out_count=0; + last_in=in; + } + + /* check if we need to really read from file */ + if ((in_count - out_count) > 0) { + /* no, return item from buffer */ + retval=&(item_buffer[out_count]); + } else { + out_count=0; + in_count=fread(item_buffer, sizeof(struct node_item), ITEM_COUNT, in); + //fprintf(stderr, "read %d items\n", in_count); + if(in_count < 1) { + /* buffer still empty after read */ + return NULL; + } + /* yes, try to read full buffer at once */ + retval=&item_buffer[0]; + } + out_count ++; + last_pos=ftell(in); + + return retval; } struct item_bin * diff --git a/navit/maptool/maptool.c b/navit/maptool/maptool.c index 9c00a9c13..5b2b7d32c 100644 --- a/navit/maptool/maptool.c +++ b/navit/maptool/maptool.c @@ -830,6 +830,8 @@ static void maptool_assemble_map(struct maptool_params *p, char *suffix, char ** map_information_attrs[1].u.str=p->url; } index_init(zip_info, 1); + g_free(zipdir); + g_free(zipindex); } if (!g_strcmp0(suffix,ch_suffix)) { /* Makes compiler happy due to bug 35903 in gcc */ ch_assemble_map(suffix0,suffix,zip_info); @@ -1125,5 +1127,7 @@ int main(int argc, char **argv) { } phase+=2; start_phase(&p,"done"); + if(p.timestamp != NULL) + g_free(p.timestamp); return 0; } diff --git a/navit/maptool/maptool.h b/navit/maptool/maptool.h index f0aee6fdf..5a97c8186 100644 --- a/navit/maptool/maptool.h +++ b/navit/maptool/maptool.h @@ -345,6 +345,7 @@ void relations_add_relation_member_entry(struct relations *rel, struct relations void *member_priv, enum relation_member_type type, osmid id); void relations_add_relation_default_entry(struct relations *rel, struct relations_func *func); void relations_process(struct relations *rel, FILE *nodes, FILE *ways); +void relations_process_multi(struct relations **rel, int count, FILE *nodes, FILE *ways); void relations_destroy(struct relations *rel); diff --git a/navit/maptool/osm.c b/navit/maptool/osm.c index caba43e94..97ae5d1cf 100644 --- a/navit/maptool/osm.c +++ b/navit/maptool/osm.c @@ -2826,6 +2826,7 @@ static int process_multipolygons_find_loops(osmid relid, int in_count, struct it sequence_count = process_multipolygons_find_loop(in_count, parts, sequence, used); if(sequence_count < 0) { done = 1; + g_free(sequence); } else if(sequence_count == 0) { osm_warning("relation",relid,0,"multipolygon: skipping non loop sequence\n"); /* skip empty sequence */ @@ -3260,11 +3261,15 @@ void process_multipolygons(FILE *in, FILE *coords, FILE *ways, FILE *ways_index, if(ways) fseek(ways, 0,SEEK_SET); fprintf(stderr,"process_multipolygons:process (thread %d)\n", i); + /* we could use relations_process_multi here as well, but this would + * use way more memory. */ relations_process(relations[i], coords, ways); fprintf(stderr,"process_multipolygons:finish (thread %d)\n", i); process_multipolygons_finish(multipolygons[i], out); relations_destroy(relations[i]); } + if(multipolygons != NULL) + g_free(multipolygons); g_free(relations); sig_alrm(0); sig_alrm_end(); @@ -3386,6 +3391,8 @@ static void process_turn_restrictions_finish(GList *tr, FILE *out) { } } + /* just for fun...*/ + processed_relations ++; g_free(t->c[0]); g_free(t->c[1]); g_free(t->c[2]); @@ -3395,60 +3402,64 @@ static void process_turn_restrictions_finish(GList *tr, FILE *out) { g_list_free(tr); } -static GList *process_turn_restrictions_setup(FILE *in, struct relations *relations) { +/** + * @brief prepare one multipolygon relation for relattion processing + * + * @param ib the relation + * @param relations the relation processing structure + * @param relations_func function to use for the members + * @param turn_restrictions write the resulting turn_restriction to the list + */ +static void process_turn_restrictions_setup_one(struct item_bin * ib, struct relations * relations, + struct relations_func * relations_func, GList ** turn_restrictions) { struct relation_member fromm,tom,viam,tmpm; long long relid; - struct item_bin *ib; - struct relations_func *relations_func; int min_count; - GList *turn_restrictions=NULL; - fseek(in, 0, SEEK_SET); - relations_func=relations_func_new(process_turn_restrictions_member, NULL); - while ((ib=read_item(in))) { + if(ib != NULL) { struct turn_restriction *turn_restriction; relid=item_bin_get_relationid(ib); min_count=0; if (!search_relation_member(ib, "from",&fromm,&min_count)) { osm_warning("relation",relid,0,"turn restriction: from member missing\n"); - continue; + return; } if (search_relation_member(ib, "from",&tmpm,&min_count)) { osm_warning("relation",relid,0,"turn restriction: multiple from members\n"); - continue; + return; } min_count=0; if (!search_relation_member(ib, "to",&tom,&min_count)) { osm_warning("relation",relid,0,"turn restriction: to member missing\n"); - continue; + return; } if (search_relation_member(ib, "to",&tmpm,&min_count)) { osm_warning("relation",relid,0,"turn restriction: multiple to members\n"); - continue; + return; } min_count=0; if (!search_relation_member(ib, "via",&viam,&min_count)) { osm_warning("relation",relid,0,"turn restriction: via member missing\n"); - continue; + return; } if (search_relation_member(ib, "via",&tmpm,&min_count)) { osm_warning("relation",relid,0,"turn restriction: multiple via member\n"); - continue; + return; } if (fromm.type != rel_member_way) { osm_warning("relation",relid,0,"turn restriction: wrong type for from member "); osm_warning(osm_types[fromm.type],fromm.id,1,"\n"); - continue; + return; } if (tom.type != rel_member_way) { osm_warning("relation",relid,0,"turn restriction: wrong type for to member "); osm_warning(osm_types[tom.type],tom.id,1,"\n"); - continue; + return; } if (viam.type != rel_member_node && viam.type != rel_member_way) { osm_warning("relation",relid,0,"turn restriction: wrong type for via member "); osm_warning(osm_types[viam.type],viam.id,1,"\n"); - continue; + return; } turn_restriction=g_new0(struct turn_restriction, 1); turn_restriction->relid=relid; @@ -3458,19 +3469,165 @@ static GList *process_turn_restrictions_setup(FILE *in, struct relations *relati relations_add_relation_member_entry(relations, relations_func, turn_restriction, (gpointer) 0, fromm.type, fromm.id); relations_add_relation_member_entry(relations, relations_func, turn_restriction, (gpointer) 1, viam.type, viam.id); relations_add_relation_member_entry(relations, relations_func, turn_restriction, (gpointer) 2, tom.type, tom.id); - turn_restrictions=g_list_append(turn_restrictions, turn_restriction); + *turn_restrictions=g_list_append(*turn_restrictions, turn_restriction); + } +} + +/** + * @brief worker thread private storage + */ +struct process_turn_restrictions_setup_thread { + int number; + GAsyncQueue * queue; + struct relations * relations; + struct relations_func * relations_func; + GList* turn_restrictions; + GThread * thread; +}; + +/** + * @brief turn restrictions setup worker thread. + * + * This thread processes any item passed to it via async queue into it's local relations + * function. + * @param data this threads local storage + */ +static gpointer process_turn_restrictions_setup_worker (gpointer data) { + struct item_bin * ib; + //long long relid; + struct process_turn_restrictions_setup_thread * me = (struct process_turn_restrictions_setup_thread*) data; + fprintf(stderr,"worker %d up\n", me->number); + while((ib=g_async_queue_pop (me->queue)) != &killer) { + processed_relations ++; + //relid=item_bin_get_relationid(ib); + //fprintf(stderr,"worker %d processing %lld\n", me->number, relid); + process_turn_restrictions_setup_one(ib, me->relations, me->relations_func, &(me->turn_restrictions)); + /* done with that. Free the item_bin */ + g_free(ib); + } + fprintf(stderr,"worker %d exit\n", me->number); + g_thread_exit(NULL); + return NULL; +} + +/** + * @brief prepare turn restriction way matching + * + * This function reads all turn restriction relations and prepares relations structures + * for later way matching. Since this scales quite ugly, (O^3) i think, we use multiple threads + * creating their own hash each. This way none of the hashes get's that big, and we can utilize + * more cpu power. + * + * @param in file containing the relations + * @param thread_count number of threads to use + * @param relations array of preallocated relations structures. One per thread. + * + * @returns array of GLists. One per thread containing the resulting structures. + */ +static GList ** process_turn_restrictions_setup(FILE *in, int thread_count, struct relations **relations) { + struct process_turn_restrictions_setup_thread *sthread; + + struct item_bin *ib; + struct relations_func *relations_func; + int i; + GList **turn_restrictions=NULL; + /* allocate and reference async queue */ + GAsyncQueue * ib_queue=g_async_queue_new (); + g_async_queue_ref(ib_queue); + /* allocate per thread storage */ + sthread=g_malloc0(sizeof(struct process_turn_restrictions_setup_thread) * thread_count); + + fseek(in, 0, SEEK_SET); + relations_func=relations_func_new(process_turn_restrictions_member, NULL); + + /* start the threads */ + for(i=0; i < thread_count; i ++) { + sthread[i].number = i; + sthread[i].queue = ib_queue; + sthread[i].relations_func = relations_func; + sthread[i].relations = relations[i]; + sthread[i].turn_restrictions = NULL; + sthread[i].thread = g_thread_new ("process_turn_restrictions_setup_worker", process_turn_restrictions_setup_worker, + &(sthread[i])); } + + while ((ib=read_item(in))) { + /* get a duplicate of the returned item, as the one returned shares buffer */ + struct item_bin * dup = item_bin_dup(ib); + //long long relid; + //relid=item_bin_get_relationid(dup); + //fprintf(stderr,"Pushing %lld\n", relid); + /* the dup's will be freed by the thread processing them*/ + g_async_queue_push(ib_queue,dup); + /* limit queue size. This is ugly, but since GAsyncQueue doesn't support + * push to block when the queue reached a decent size, I help myself + * with this ugly hack */ + while(g_async_queue_length(ib_queue) > 1000) + usleep(200); + } + + /* stop iand join all remaining threads */ + for(i = 0; i < thread_count; i ++) + g_async_queue_push(ib_queue,&killer); + for(i=0; i < thread_count; i ++) + g_thread_join(sthread[i].thread); + + /* rescue the resulting glist */ + turn_restrictions = g_malloc0(sizeof(GList *) * thread_count); + for(i =0; i < thread_count; i ++) + turn_restrictions[i]=sthread[i].turn_restrictions; + + /* free the thread storage */ + g_free(sthread); + + /* release the queue */ + g_async_queue_unref(ib_queue); + + /* return the list of turn_restrictions */ return turn_restrictions; } void process_turn_restrictions(FILE *in, FILE *coords, FILE *ways, FILE *ways_index, FILE *out) { - struct relations *relations=relations_new(); - GList *turn_restrictions; + /* thread count is from maptool.c as commandline parameter */ + int i; + struct relations **relations; + GList **turn_restrictions = NULL; + sig_alrm(0); + + relations = g_malloc0(sizeof(struct relations *) * thread_count); + for(i=0; i < thread_count; i ++) + relations[i] = relations_new(); fseek(in, 0, SEEK_SET); - turn_restrictions=process_turn_restrictions_setup(in, relations); - relations_process(relations, coords, ways); - process_turn_restrictions_finish(turn_restrictions, out); - relations_destroy(relations); + fprintf(stderr,"process_turn_restrictions:setup (threads %d)\n", thread_count); + turn_restrictions=process_turn_restrictions_setup(in,thread_count,relations); + /* Here we get an array of resulting relations structures and resultin + * GLists. + * Of course we need to iterate the ways multiple times, but that's fast + * compared to hashing the relations structures + * This even saves a lot of main memory, as we can process every result from + * every thread at once completely. Since we know it's self containing + */ + sig_alrm(0); + processed_relations=0; + processed_ways=0; + sig_alrm(0); + if(coords) + fseek(coords, 0,SEEK_SET); + if(ways) + fseek(ways, 0,SEEK_SET); + fprintf(stderr,"process_multipolygons:process (thread %d)\n", i); + relations_process_multi(relations, thread_count, coords, ways); + for( i=0; i < thread_count; i ++) { + + fprintf(stderr,"process_turn_restrictions:finish (thread %d)\n", i); + process_turn_restrictions_finish(turn_restrictions[i], out); + relations_destroy(relations[i]); + } + if(turn_restrictions != NULL) + g_free(turn_restrictions); + g_free(relations); + sig_alrm(0); + sig_alrm_end(); } #if 0 void process_turn_restrictions_old(FILE *in, FILE *coords, FILE *ways, FILE *ways_index, FILE *out) { diff --git a/navit/maptool/osm_relations.c b/navit/maptool/osm_relations.c index a6084f21e..3d05caab7 100644 --- a/navit/maptool/osm_relations.c +++ b/navit/maptool/osm_relations.c @@ -164,6 +164,67 @@ void relations_process(struct relations *rel, FILE *nodes, FILE *ways) { } } +/* + * @brief The actual relations processing: Loop through raw data and process any relations members. + * This function reads through all nodes and ways passed in, and looks up each item in the + * relations collection. For each relation member found, its processing function is called. + * @param in rel relations collection storing pre-processed relations. Built using relations_add_relation_member_entry. + * @param in nodes file containing nodes in "coords.tmp" format + * @param in ways file containing items in item_bin format. This file may contain both nodes, ways, and relations in that format. + */ +void relations_process_multi(struct relations **rel, int count, FILE *nodes, FILE *ways) { + char buffer[128]; + struct item_bin *ib=(struct item_bin *)buffer; + osmid *id; + struct coord *c=(struct coord *)(ib+1),cn= {0,0}; + struct node_item *ni; + GList *l; + + if(count <= 0) + return; + + if (nodes) { + item_bin_init(ib, type_point_unkn); + item_bin_add_coord(ib, &cn, 1); + item_bin_add_attr_longlong(ib, attr_osm_nodeid, 0); + id=item_bin_get_attr(ib, attr_osm_nodeid, NULL); + while ((ni=read_node_item(nodes))) { + int i; + *id=ni->nd_id; + *c=ni->c; + for(i=0; i < count; i ++) { + l=g_hash_table_lookup(rel[i]->member_hash[0], id); + while (l) { + struct relations_member *memb=l->data; + memb->func->func(memb->func->func_priv, memb->relation_priv, ib, memb->member_priv); + l=g_list_next(l); + } + } + } + } + if (ways) { + while ((ib=read_item(ways))) { + int i; + for(i=0; i < count; i ++) { + l=NULL; + if(NULL!=(id=item_bin_get_attr(ib, attr_osm_nodeid, NULL))) + l=g_hash_table_lookup(rel[i]->member_hash[0], id); + else if(NULL!=(id=item_bin_get_attr(ib, attr_osm_wayid, NULL))) + l=g_hash_table_lookup(rel[i]->member_hash[1], id); + else if(NULL!=(id=item_bin_get_attr(ib, attr_osm_relationid, NULL))) + l=g_hash_table_lookup(rel[i]->member_hash[2], id); + if(!l) + l=rel[i]->default_members; + while (l) { + struct relations_member *memb=l->data; + memb->func->func(memb->func->func_priv, memb->relation_priv, ib, memb->member_priv); + l=g_list_next(l); + } + } + } + } +} + static void relations_destroy_func(void *key, GList *l, void *data) { GList *ll=l; while (ll) { @@ -180,5 +241,13 @@ void relations_destroy(struct relations *relations) { g_hash_table_foreach(relations->member_hash[i], (GHFunc)relations_destroy_func, NULL); g_hash_table_destroy(relations->member_hash[i]); } + if(relations->default_members != NULL) { + GList *ll=relations->default_members; + while (ll) { + g_free(ll->data); + ll=g_list_next(ll); + } + g_list_free(relations->default_members); + } g_free(relations); } -- cgit v1.2.1