summaryrefslogtreecommitdiff
path: root/examples/sql/c/ex_sql_multi_thread.c
blob: 83e641a39a7de3376089c73f095c66a538e6ae66 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 1997, 2015 Oracle and/or its affiliates.  All rights reserved.
 *
 */

#include "ex_sql_utils.h"

/*
 * Create dozens of writer threads to insert data in parallel.
 * DBSQL will not call the busy callback. It blocks instead.
 */

typedef struct {
	const char* db_name;        /* The filename of db. */
	int num_of_records;   /* The number of records to insert. */
	int thread_sn;              /* Serial number of thread. */
} thread_attr;

/*
 * Define the writer thread's workload.
 * The writer would insert 5000 records in its thread. Commit if succeeded 
 * and rollback if failed.
 */
static void*
writer(arg)
	void *arg;
{
	const char* sql;
	int txn_begin;
	int num_of_records;
	int thread_sn;
	int i, rc;
	sqlite3* db;
	sqlite3_stmt* stmt;

	txn_begin = 0; /* Mark that explicit txn does not begin yet. */

	/* Open database. */
	sqlite3_open(((thread_attr *)arg)->db_name, &db);
	error_handler(db);

	/* Fetch attributes. */
	num_of_records = ((thread_attr *)arg)->num_of_records;
	thread_sn = ((thread_attr *)arg)->thread_sn;

	/* Prepare the statement for use, many times over. */
	sql = "INSERT INTO university VALUES"
	      "(147, 'Tsinghua University China', 'tsinghua.edu.cn',"
	      "'cn', 'Asia', 237,63,432,303);";
	rc = sqlite3_prepare_v2(db, sql, (int)strlen(sql), &stmt, NULL);
	if (rc != SQLITE_OK)
		goto cleanup;

	/* 
	 * When we insert data many times over, we shall use explicit
	 * transaction to speed up the operations.
	 */
	rc = sqlite3_exec(db, "BEGIN TRANSACTION", NULL, 0, NULL);
	if (rc != SQLITE_OK)
		goto cleanup;
	txn_begin = 1; /* Mark that explicit txn began. */

	for (i = 0; i < num_of_records; i++) {
		rc = sqlite3_step(stmt);
		/*
		 * Even if we encounter errors, the statement still has 
		 * to be reset. Otherwise following rollback always 
		 * hits SQLITE_BUSY 
		 */
		sqlite3_reset(stmt);
		if (rc != SQLITE_DONE) {
			/* We can not return here. Rollback is required. */
			goto cleanup;
			break;
		}
	}

	/* Commit if no errors. */
	rc = sqlite3_exec(db, "COMMIT TRANSACTION", NULL, 0, NULL);
	if (rc != SQLITE_OK)
		goto cleanup;

cleanup:
	/* Error handle. */
	if (rc != SQLITE_OK && rc != SQLITE_DONE) {
		fprintf(stderr, "ERROR: %s. ERRCODE: %d.\n",
			sqlite3_errmsg(db), rc);
		/* Rollback if explict txn had begined. */
		if (txn_begin)
			sqlite3_exec(db, "ROLLBACK TRANSACTION", NULL, 0, NULL);
	}

	/* Final cleanup. */
	sqlite3_finalize(stmt);

	sqlite3_close(db);
	return NULL;
}

/* Example body. */
static int
ex_sql_multi_thread(db, db_name)
	db_handle *db;
	const char* db_name;
{
	const char* sql;
	int nthreads;
	int ninsert;
	int i;
	thread_attr attr;
	os_thread_t pid;

	nthreads = 20;
	ninsert  = 5000;

	/* Display current status. */
	echo_info("Check existing record number of the table");
	sql = "SELECT count(*) FROM university;"; 
	exec_sql(db, sql);

	/*
	 * Create n threads and write in parallel.
	 */
	echo_info("Now we begin to insert records by multi-writers.");
	attr.db_name = db_name;
	attr.num_of_records = ninsert;

	for (i = 0; i < nthreads; i++) {
		attr.thread_sn = i;
		if (os_thread_create(&pid, writer, (void *)&attr)) {
			register_thread_id(pid);
			printf("%02dth writer starts to write %d rows\n",
				i + 1, ninsert);
			sqlite3_sleep(20);	/* Milliseconds. */
		} else {
			fprintf(stderr, "Failed to create thread\n");
		}
	}
	join_threads();

	/* Display result. */
	echo_info("Check existing record number of the table");
	sql = "SELECT count(*) FROM university;"; 
	exec_sql(db, sql);

	return 0;
}

int
main()
{
	db_handle *db;
	const char* db_name = "ex_sql_multi_thread.db";

	/* Check if current lib is threadsafe. */
	if(!sqlite3_threadsafe()) {
		fprintf(stderr,
			"ERROR: The libsqlite version is NOT threadsafe!\n");
		exit(EXIT_FAILURE);
	}

	/* Setup environment and preload data. */
	db = setup(db_name);
	load_table_from_file(db, university_sample_data, 1/* Silent */);

	/* Run example. */
	ex_sql_multi_thread(db, db_name);

	/* End. */
	cleanup(db);
	return 0;
}