summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNguyễn Thái Ngọc Duy <pclouds@gmail.com>2012-03-12 09:32:29 +0700
committerJunio C Hamano <gitster@pobox.com>2012-03-11 23:37:04 -0700
commitc32a1bf818fe7304d0e1868f3d2316132512276d (patch)
treedcc41e70578b60d5f212d7b04b9399bea7dd01ab
parent8e8aa58784996b2556edc9d2da0889c887dec50e (diff)
downloadgit-nd/threaded-index-pack.tar.gz
index-pack: support multithreaded delta resolvingnd/threaded-index-pack
This puts delta resolving on each base on a separate thread, one base cache per thread. Per-thread data is grouped in struct thread_local. When running with nr_threads == 1, no pthreads calls are made. The system essentially runs in non-thread mode. An experiment on a Xeon 24 core machine with linux-2.6.git shows that performance does not increase proportional to the number of cores. So by default, we use maximum 3 cores. Some numbers with --threads from 1 to 16: 1..4 real 1m16.310s 0m48.183s 0m37.866s 0m32.834s user 1m13.773s 1m15.537s 1m15.781s 1m16.233s sys 0m2.480s 0m3.936s 0m4.448s 0m4.852s 5..8 real 0m33.170s 0m30.369s 0m28.406s 0m26.968s user 1m31.474s 1m30.322s 1m29.562s 1m28.694s sys 0m6.096s 0m6.268s 0m6.684s 0m7.172s 9..12 real 0m26.288s 0m26.207s 0m26.239s 0m24.945s user 1m29.530s 1m36.146s 1m43.134s 1m34.182s sys 0m8.129s 0m8.437s 0m9.697s 0m10.201s 13..16 real 0m25.110s 0m25.043s 0m23.955s 0m25.746s user 1m39.262s 1m43.598s 1m38.350s 1m59.775s sys 0m10.997s 0m11.553s 0m11.949s 0m13.689s Thanks to Ramsay Jones for troubleshooting on MinGW platform. Signed-off-by: Nguyễn Thái Ngọc Duy <pclouds@gmail.com> Signed-off-by: Junio C Hamano <gitster@pobox.com>
-rw-r--r--Documentation/git-index-pack.txt10
-rw-r--r--Makefile2
-rw-r--r--builtin/index-pack.c198
3 files changed, 192 insertions, 18 deletions
diff --git a/Documentation/git-index-pack.txt b/Documentation/git-index-pack.txt
index 909687fed4..39e6d0ddd8 100644
--- a/Documentation/git-index-pack.txt
+++ b/Documentation/git-index-pack.txt
@@ -74,6 +74,16 @@ OPTIONS
--strict::
Die, if the pack contains broken objects or links.
+--threads=<n>::
+ Specifies the number of threads to spawn when resolving
+ deltas. This requires that index-pack be compiled with
+ pthreads otherwise this option is ignored with a warning.
+ This is meant to reduce packing time on multiprocessor
+ machines. The required amount of memory for the delta search
+ window is however multiplied by the number of threads.
+ Specifying 0 will cause git to auto-detect the number of CPU's
+ and use maximum 3 threads.
+
Note
----
diff --git a/Makefile b/Makefile
index cf2c40b44f..e41955ff95 100644
--- a/Makefile
+++ b/Makefile
@@ -2160,7 +2160,7 @@ builtin/branch.o builtin/checkout.o builtin/clone.o builtin/reset.o branch.o tra
builtin/bundle.o bundle.o transport.o: bundle.h
builtin/bisect--helper.o builtin/rev-list.o bisect.o: bisect.h
builtin/clone.o builtin/fetch-pack.o transport.o: fetch-pack.h
-builtin/grep.o builtin/pack-objects.o transport-helper.o thread-utils.o: thread-utils.h
+builtin/index-pack.o builtin/grep.o builtin/pack-objects.o transport-helper.o thread-utils.o: thread-utils.h
builtin/send-pack.o transport.o: send-pack.h
builtin/log.o builtin/shortlog.o: shortlog.h
builtin/prune.o builtin/reflog.o reachable.o: reachable.h
diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index 918684f792..7e3b287023 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -9,6 +9,7 @@
#include "progress.h"
#include "fsck.h"
#include "exec_cmd.h"
+#include "thread-utils.h"
static const char index_pack_usage[] =
"git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>] [--verify] [--strict] (<pack-file> | --stdin [--fix-thin] [<pack-file>])";
@@ -38,6 +39,14 @@ struct base_data {
int ofs_first, ofs_last;
};
+struct thread_local {
+#ifndef NO_PTHREADS
+ pthread_t thread;
+#endif
+ struct base_data *base_cache;
+ size_t base_cache_used;
+};
+
/*
* Even if sizeof(union delta_base) == 24 on 64-bit archs, we really want
* to memcmp() only the first 20 bytes.
@@ -54,11 +63,12 @@ struct delta_entry {
static struct object_entry *objects;
static struct delta_entry *deltas;
-static struct base_data *base_cache;
-static size_t base_cache_used;
+static struct thread_local nothread_data;
static int nr_objects;
+static int nr_processed;
static int nr_deltas;
static int nr_resolved_deltas;
+static int nr_threads;
static int from_stdin;
static int strict;
@@ -75,6 +85,72 @@ static git_SHA_CTX input_ctx;
static uint32_t input_crc32;
static int input_fd, output_fd, pack_fd;
+#ifndef NO_PTHREADS
+
+static struct thread_local *thread_data;
+
+static pthread_mutex_t read_mutex;
+#define read_lock() lock_mutex(&read_mutex)
+#define read_unlock() unlock_mutex(&read_mutex)
+
+static pthread_mutex_t counter_mutex;
+#define counter_lock() lock_mutex(&counter_mutex)
+#define counter_unlock() unlock_mutex(&counter_mutex)
+
+static pthread_mutex_t work_mutex;
+#define work_lock() lock_mutex(&work_mutex)
+#define work_unlock() unlock_mutex(&work_mutex)
+
+static pthread_key_t key;
+
+static inline void lock_mutex(pthread_mutex_t *mutex)
+{
+ if (nr_threads > 1)
+ pthread_mutex_lock(mutex);
+}
+
+static inline void unlock_mutex(pthread_mutex_t *mutex)
+{
+ if (nr_threads > 1)
+ pthread_mutex_unlock(mutex);
+}
+
+/*
+ * Mutex and conditional variable can't be statically-initialized on Windows.
+ */
+static void init_thread(void)
+{
+ init_recursive_mutex(&read_mutex);
+ pthread_mutex_init(&counter_mutex, NULL);
+ pthread_mutex_init(&work_mutex, NULL);
+ pthread_key_create(&key, NULL);
+ thread_data = xcalloc(nr_threads, sizeof(*thread_data));
+}
+
+static void cleanup_thread(void)
+{
+ pthread_mutex_destroy(&read_mutex);
+ pthread_mutex_destroy(&counter_mutex);
+ pthread_mutex_destroy(&work_mutex);
+ pthread_key_delete(key);
+ nr_threads = 1;
+ free(thread_data);
+}
+
+#else
+
+#define read_lock()
+#define read_unlock()
+
+#define counter_lock()
+#define counter_unlock()
+
+#define work_lock()
+#define work_unlock()
+
+#endif
+
+
static int mark_link(struct object *obj, int type, void *data)
{
if (!obj)
@@ -223,6 +299,17 @@ static NORETURN void bad_object(unsigned long offset, const char *format, ...)
die("pack has bad object at offset %lu: %s", offset, buf);
}
+static struct thread_local *get_thread_data(void)
+{
+#ifndef NO_PTHREADS
+ if (nr_threads > 1)
+ return pthread_getspecific(key);
+#endif
+ assert(nr_threads == 1 &&
+ "This should only be reached when all threads are gone");
+ return &nothread_data;
+}
+
static struct base_data *alloc_base_data(void)
{
struct base_data *base = xmalloc(sizeof(struct base_data));
@@ -237,15 +324,16 @@ static void free_base_data(struct base_data *c)
if (c->data) {
free(c->data);
c->data = NULL;
- base_cache_used -= c->size;
+ get_thread_data()->base_cache_used -= c->size;
}
}
static void prune_base_data(struct base_data *retain)
{
struct base_data *b;
- for (b = base_cache;
- base_cache_used > delta_base_cache_limit && b;
+ struct thread_local *data = get_thread_data();
+ for (b = data->base_cache;
+ data->base_cache_used > delta_base_cache_limit && b;
b = b->child) {
if (b->data && b != retain)
free_base_data(b);
@@ -257,12 +345,12 @@ static void link_base_data(struct base_data *base, struct base_data *c)
if (base)
base->child = c;
else
- base_cache = c;
+ get_thread_data()->base_cache = c;
c->base = base;
c->child = NULL;
if (c->data)
- base_cache_used += c->size;
+ get_thread_data()->base_cache_used += c->size;
prune_base_data(c);
}
@@ -272,7 +360,7 @@ static void unlink_base_data(struct base_data *c)
if (base)
base->child = NULL;
else
- base_cache = NULL;
+ get_thread_data()->base_cache = NULL;
free_base_data(c);
}
@@ -461,19 +549,24 @@ static void sha1_object(const void *data, unsigned long size,
enum object_type type, unsigned char *sha1)
{
hash_sha1_file(data, size, typename(type), sha1);
+ read_lock();
if (has_sha1_file(sha1)) {
void *has_data;
enum object_type has_type;
unsigned long has_size;
has_data = read_sha1_file(sha1, &has_type, &has_size);
+ read_unlock();
if (!has_data)
die("cannot read existing object %s", sha1_to_hex(sha1));
if (size != has_size || type != has_type ||
memcmp(data, has_data, size) != 0)
die("SHA1 COLLISION FOUND WITH %s !", sha1_to_hex(sha1));
free(has_data);
- }
+ } else
+ read_unlock();
+
if (strict) {
+ read_lock();
if (type == OBJ_BLOB) {
struct blob *blob = lookup_blob(sha1);
if (blob)
@@ -507,6 +600,7 @@ static void sha1_object(const void *data, unsigned long size,
}
obj->flags |= FLAG_CHECKED;
}
+ read_unlock();
}
}
@@ -552,7 +646,7 @@ static void *get_base_data(struct base_data *c)
if (!delta_nr) {
c->data = get_data_from_pack(obj);
c->size = obj->size;
- base_cache_used += c->size;
+ get_thread_data()->base_cache_used += c->size;
prune_base_data(c);
}
for (; delta_nr > 0; delta_nr--) {
@@ -568,7 +662,7 @@ static void *get_base_data(struct base_data *c)
free(raw);
if (!c->data)
bad_object(obj->idx.offset, "failed to apply delta");
- base_cache_used += c->size;
+ get_thread_data()->base_cache_used += c->size;
prune_base_data(c);
}
free(delta);
@@ -596,7 +690,9 @@ static void resolve_delta(struct object_entry *delta_obj,
bad_object(delta_obj->idx.offset, "failed to apply delta");
sha1_object(result->data, result->size, delta_obj->real_type,
delta_obj->idx.sha1);
+ counter_lock();
nr_resolved_deltas++;
+ counter_unlock();
}
static struct base_data *find_unresolved_deltas_1(struct base_data *base,
@@ -696,7 +792,31 @@ static void second_pass(struct object_entry *obj)
base_obj->obj = obj;
base_obj->data = NULL;
find_unresolved_deltas(base_obj);
- display_progress(progress, nr_resolved_deltas);
+}
+
+static void *threaded_second_pass(void *arg)
+{
+#ifndef NO_PTHREADS
+ if (nr_threads > 1)
+ pthread_setspecific(key, arg);
+#endif
+ for (;;) {
+ int i;
+ work_lock();
+ display_progress(progress, nr_resolved_deltas);
+ while (nr_processed < nr_objects &&
+ is_delta_type(objects[nr_processed].type))
+ nr_processed++;
+ if (nr_processed >= nr_objects) {
+ work_unlock();
+ break;
+ }
+ i = nr_processed++;
+ work_unlock();
+
+ second_pass(&objects[i]);
+ }
+ return NULL;
}
/* Parse all objects and return the pack content SHA1 hash */
@@ -755,13 +875,25 @@ static void parse_pack_objects(unsigned char *sha1)
if (verbose)
progress = start_progress("Resolving deltas", nr_deltas);
- for (i = 0; i < nr_objects; i++) {
- struct object_entry *obj = &objects[i];
- if (is_delta_type(obj->type))
- continue;
- second_pass(obj);
+ nr_processed = 0;
+#ifndef NO_PTHREADS
+ if (nr_threads > 1) {
+ init_thread();
+ for (i = 0; i < nr_threads; i++) {
+ int ret = pthread_create(&thread_data[i].thread, NULL,
+ threaded_second_pass, thread_data + i);
+ if (ret)
+ die("unable to create thread: %s", strerror(ret));
+ }
+ for (i = 0; i < nr_threads; i++)
+ pthread_join(thread_data[i].thread, NULL);
+
+ cleanup_thread();
+ return;
}
+#endif
+ threaded_second_pass(NULL);
}
static int write_compressed(struct sha1file *f, void *in, unsigned int size)
@@ -967,6 +1099,18 @@ static int git_index_pack_config(const char *k, const char *v, void *cb)
die("bad pack.indexversion=%"PRIu32, opts->version);
return 0;
}
+ if (!strcmp(k, "pack.threads")) {
+ nr_threads = git_config_int(k, v);
+ if (nr_threads < 0)
+ die("invalid number of threads specified (%d)",
+ nr_threads);
+#ifdef NO_PTHREADS
+ if (nr_threads != 1)
+ warning("no threads support, ignoring %s", k);
+ nr_threads = 1;
+#endif
+ return 0;
+ }
return git_default_config(k, v, cb);
}
@@ -1125,6 +1269,17 @@ int cmd_index_pack(int argc, const char **argv, const char *prefix)
keep_msg = "";
} else if (!prefixcmp(arg, "--keep=")) {
keep_msg = arg + 7;
+ } else if (!prefixcmp(arg, "--threads=")) {
+ char *end;
+ nr_threads = strtoul(arg+10, &end, 0);
+ if (!arg[10] || *end || nr_threads < 0)
+ usage(index_pack_usage);
+#ifdef NO_PTHREADS
+ if (nr_threads != 1)
+ warning("no threads support, "
+ "ignoring %s", arg);
+ nr_threads = 1;
+#endif
} else if (!prefixcmp(arg, "--pack_header=")) {
struct pack_header *hdr;
char *c;
@@ -1196,6 +1351,15 @@ int cmd_index_pack(int argc, const char **argv, const char *prefix)
if (strict)
opts.flags |= WRITE_IDX_STRICT;
+#ifndef NO_PTHREADS
+ if (!nr_threads) {
+ nr_threads = online_cpus();
+ /* An experiment showed that more threads does not mean faster */
+ if (nr_threads > 3)
+ nr_threads = 3;
+ }
+#endif
+
curr_pack = open_pack_file(pack_name);
parse_pack_header();
objects = xcalloc(nr_objects + 1, sizeof(struct object_entry));