summaryrefslogtreecommitdiff
path: root/examples/threads.py
blob: 41ff21f3a7d8f159825f8583db987d13f84a09c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# threads.py -- example of multiple threads using psycopg
# -*- encoding: latin1 -*-
#
# Copyright (C) 2001-2010 Federico Di Gregorio  <fog@debian.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
# License for more details.

## put in DSN your DSN string

DSN = 'dbname=test'

## some others parameters
INSERT_THREADS = ('A', 'B', 'C')
SELECT_THREADS = ('1', '2')

ROWS = 1000

COMMIT_STEP = 20
SELECT_SIZE = 10000
SELECT_STEP = 500
SELECT_DIV  = 250

# the available modes are:
# 0 - one connection for all insert and one for all select threads
# 1 - connections generated using the connection pool

MODE = 1

## don't modify anything below tis line (except for experimenting)

import sys, psycopg2, threading
from psycopg2.pool import ThreadedConnectionPool

if len(sys.argv) > 1:
    DSN = sys.argv[1]
if len(sys.argv) > 2:
    MODE = int(sys.argv[2])
    
print "Opening connection using dns:", DSN
conn = psycopg2.connect(DSN)
curs = conn.cursor()

try:
    curs.execute("""CREATE TABLE test_threads (
                        name text, value1 int4, value2 float)""")
except:
    conn.rollback()
    curs.execute("DROP TABLE test_threads")
    curs.execute("""CREATE TABLE test_threads (
                        name text, value1 int4, value2 float)""")
conn.commit()


## this function inserts a big number of rows and creates and destroys
## a large number of cursors

def insert_func(conn_or_pool, rows):
    name = threading.currentThread().getName()

    if MODE == 0:
        conn = conn_or_pool
    else:
        conn = conn_or_pool.getconn()
        
    for i in range(rows):
        if divmod(i, COMMIT_STEP)[1] == 0:
            conn.commit()
            if MODE == 1:
                conn_or_pool.putconn(conn)
            s = name + ": COMMIT STEP " + str(i)
            print s
            if MODE == 1:
                conn = conn_or_pool.getconn()
        c = conn.cursor()
        try:
            c.execute("INSERT INTO test_threads VALUES (%s, %s, %s)",
                      (str(i), i, float(i)))
        except psycopg2.ProgrammingError, err:
            print name, ": an error occurred; skipping this insert"
            print err
    conn.commit()

## a nice select function that prints the current number of rows in the
## database (and transefer them, putting some pressure on the network)
    
def select_func(conn_or_pool, z):
    name = threading.currentThread().getName()

    if MODE == 0:
        conn = conn_or_pool
        conn.set_isolation_level(0)
    
    for i in range(SELECT_SIZE):
        if divmod(i, SELECT_STEP)[1] == 0:
            try:
                if MODE == 1:
                    conn = conn_or_pool.getconn()
                    conn.set_isolation_level(0)
                c = conn.cursor()
                c.execute("SELECT * FROM test_threads WHERE value2 < %s",
                          (int(i/z),))
                l = c.fetchall()
                if MODE == 1:
                    conn_or_pool.putconn(conn)
                s = name + ": number of rows fetched: " + str(len(l))
                print s
            except psycopg2.ProgrammingError, err:
                print name, ": an error occurred; skipping this select"
                print err

## create the connection pool or the connections
if MODE == 0:
    conn_insert = psycopg2.connect(DSN)
    conn_select = psycopg2.connect(DSN)
else:
    m = len(INSERT_THREADS) + len(SELECT_THREADS)
    n = m/2
    conn_insert = conn_select = ThreadedConnectionPool(n, m, DSN)
    
## create the threads
threads = []

print "Creating INSERT threads:"
for name in INSERT_THREADS:
    t = threading.Thread(None, insert_func, 'Thread-'+name,
                         (conn_insert, ROWS))
    t.setDaemon(0)
    threads.append(t)

print "Creating SELECT threads:"
for name in SELECT_THREADS:
    t = threading.Thread(None, select_func, 'Thread-'+name,
                         (conn_select, SELECT_DIV))
    t.setDaemon(0)
    threads.append(t)

## really start the threads now
for t in threads:
    t.start()

# and wait for them to finish
for t in threads:
    t.join()
    print t.getName(), "exited OK"


conn.commit()
curs.execute("SELECT count(name) FROM test_threads")
print "Inserted", curs.fetchone()[0], "rows."

curs.execute("DROP TABLE test_threads")
conn.commit()