diff options
Diffstat (limited to 'psycopg/connection_int.c')
-rw-r--r-- | psycopg/connection_int.c | 290 |
1 files changed, 205 insertions, 85 deletions
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 22c5bc5..441d362 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -34,6 +34,19 @@ #include <string.h> +/* Mapping from isolation level name to value exposed by Python. + * Only used for backward compatibility by the isolation_level property */ + +const IsolationLevel conn_isolevels[] = { + {"", 0}, /* autocommit */ + {"read uncommitted", 1}, + {"read committed", 2}, + {"repeatable read", 3}, + {"serializable", 4}, + {"default", -1}, + { NULL } +}; + /* Return a new "string" from a char* from the database. * @@ -82,6 +95,10 @@ conn_notice_callback(void *args, const char *message) self->notice_pending = notice; } +/* Expose the notices received as Python objects. + * + * The function should be called with the connection lock and the GIL. + */ void conn_notice_process(connectionObject *self) { @@ -92,10 +109,6 @@ conn_notice_process(connectionObject *self) return; } - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&self->lock); - Py_BLOCK_THREADS; - notice = self->notice_pending; nnotices = PyList_GET_SIZE(self->notice_list); @@ -119,10 +132,6 @@ conn_notice_process(connectionObject *self) 0, nnotices - CONN_NOTICES_LIMIT); } - Py_UNBLOCK_THREADS; - pthread_mutex_unlock(&self->lock); - Py_END_ALLOW_THREADS; - conn_notice_clean(self); } @@ -130,8 +139,6 @@ void conn_notice_clean(connectionObject *self) { struct connectionObject_notice *tmp, *notice; - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&self->lock); notice = self->notice_pending; @@ -141,11 +148,8 @@ conn_notice_clean(connectionObject *self) free((void*)tmp->message); free(tmp); } - + self->notice_pending = NULL; - - pthread_mutex_unlock(&self->lock); - Py_END_ALLOW_THREADS; } @@ -161,8 +165,6 @@ conn_notifies_process(connectionObject *self) PyObject *notify = NULL; PyObject *pid = NULL, *channel = NULL, *payload = NULL; - /* TODO: we are called without the lock! */ - while ((pgn = PQnotifies(self->pgconn)) != NULL) { Dprintf("conn_notifies_process: got NOTIFY from pid %d, msg = %s", @@ -358,26 +360,57 @@ exit: return rv; } + int -conn_get_isolation_level(PGresult *pgres) +conn_get_isolation_level(connectionObject *self) { - static const char lvl1a[] = "read uncommitted"; - static const char lvl1b[] = "read committed"; - int rv; + PGresult *pgres = NULL; + char *error = NULL; + int rv = -1; + char *lname; + const IsolationLevel *level; + + /* this may get called by async connections too: here's your result */ + if (self->autocommit) { + return 0; + } + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&self->lock); - char *isolation_level = PQgetvalue(pgres, 0, 0); + if (!(lname = pq_get_guc_locked(self, "default_transaction_isolation", + &pgres, &error, &_save))) { + goto endlock; + } - if ((strcmp(lvl1b, isolation_level) == 0) /* most likely */ - || (strcmp(lvl1a, isolation_level) == 0)) - rv = ISOLATION_LEVEL_READ_COMMITTED; - else /* if it's not one of the lower ones, it's SERIALIZABLE */ - rv = ISOLATION_LEVEL_SERIALIZABLE; + /* find the value for the requested isolation level */ + level = conn_isolevels; + while ((++level)->name) { + if (0 == strcasecmp(level->name, lname)) { + rv = level->value; + break; + } + } + if (-1 == rv) { + error = malloc(256); + PyOS_snprintf(error, 256, + "unexpected isolation level: '%s'", lname); + } - CLEARPGRES(pgres); + free(lname); + +endlock: + pthread_mutex_unlock(&self->lock); + Py_END_ALLOW_THREADS; + + if (rv < 0) { + pq_complete_error(self, &pgres, &error); + } return rv; } + int conn_get_protocol_version(PGconn *pgconn) { @@ -425,8 +458,8 @@ conn_is_datestyle_ok(PGconn *pgconn) int conn_setup(connectionObject *self, PGconn *pgconn) { - PGresult *pgres; - int green; + PGresult *pgres = NULL; + char *error = NULL; self->equote = conn_get_standard_conforming_strings(pgconn); self->server_version = conn_get_server_version(pgconn); @@ -450,51 +483,24 @@ conn_setup(connectionObject *self, PGconn *pgconn) pthread_mutex_lock(&self->lock); Py_BLOCK_THREADS; - green = psyco_green(); - - if (green && (pq_set_non_blocking(self, 1, 1) != 0)) { + if (psyco_green() && (pq_set_non_blocking(self, 1, 1) != 0)) { return -1; } if (!conn_is_datestyle_ok(self->pgconn)) { - if (!green) { - Py_UNBLOCK_THREADS; - Dprintf("conn_connect: exec query \"%s\";", psyco_datestyle); - pgres = PQexec(pgconn, psyco_datestyle); - Py_BLOCK_THREADS; - } else { - pgres = psyco_exec_green(self, psyco_datestyle); - } - - if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) { - PyErr_SetString(OperationalError, "can't set datestyle to ISO"); - IFCLEARPGRES(pgres); - Py_UNBLOCK_THREADS; - pthread_mutex_unlock(&self->lock); - Py_BLOCK_THREADS; - return -1; - } - CLEARPGRES(pgres); - } - - if (!green) { + int res; Py_UNBLOCK_THREADS; - pgres = PQexec(pgconn, psyco_transaction_isolation); + res = pq_set_guc_locked(self, "datestyle", "ISO", + &pgres, &error, &_save); Py_BLOCK_THREADS; - } else { - pgres = psyco_exec_green(self, psyco_transaction_isolation); + if (res < 0) { + pq_complete_error(self, &pgres, &error); + return -1; + } } - if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) { - PyErr_SetString(OperationalError, - "can't fetch default_isolation_level"); - IFCLEARPGRES(pgres); - Py_UNBLOCK_THREADS; - pthread_mutex_unlock(&self->lock); - Py_BLOCK_THREADS; - return -1; - } - self->isolation_level = conn_get_isolation_level(pgres); + /* for reset */ + self->autocommit = 0; Py_UNBLOCK_THREADS; pthread_mutex_unlock(&self->lock); @@ -779,7 +785,7 @@ _conn_poll_setup_async(connectionObject *self) * expected to manage the transactions himself, by sending * (asynchronously) BEGIN and COMMIT statements. */ - self->isolation_level = ISOLATION_LEVEL_AUTOCOMMIT; + self->autocommit = 1; /* If the datestyle is ISO or anything else good, * we can skip the CONN_STATUS_DATESTYLE step. */ @@ -857,7 +863,7 @@ conn_poll(connectionObject *self) case CONN_STATUS_PREPARED: res = _conn_poll_query(self); - if (res == PSYCO_POLL_OK && self->async_cursor) { + if (res == PSYCO_POLL_OK && self->async && self->async_cursor) { /* An async query has just finished: parse the tuple in the * target cursor. */ cursorObject *curs; @@ -952,6 +958,77 @@ conn_rollback(connectionObject *self) return res; } +int +conn_set_session(connectionObject *self, + const char *isolevel, const char *readonly, const char *deferrable, + int autocommit) +{ + PGresult *pgres = NULL; + char *error = NULL; + int res = -1; + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&self->lock); + + if (isolevel) { + Dprintf("conn_set_session: setting isolation to %s", isolevel); + if ((res = pq_set_guc_locked(self, + "default_transaction_isolation", isolevel, + &pgres, &error, &_save))) { + goto endlock; + } + } + + if (readonly) { + Dprintf("conn_set_session: setting read only to %s", readonly); + if ((res = pq_set_guc_locked(self, + "default_transaction_read_only", readonly, + &pgres, &error, &_save))) { + goto endlock; + } + } + + if (deferrable) { + Dprintf("conn_set_session: setting deferrable to %s", deferrable); + if ((res = pq_set_guc_locked(self, + "default_transaction_deferrable", deferrable, + &pgres, &error, &_save))) { + goto endlock; + } + } + + if (self->autocommit != autocommit) { + Dprintf("conn_set_session: setting autocommit to %d", autocommit); + self->autocommit = autocommit; + } + + res = 0; + +endlock: + pthread_mutex_unlock(&self->lock); + Py_END_ALLOW_THREADS; + + if (res < 0) { + pq_complete_error(self, &pgres, &error); + } + + return res; +} + +int +conn_set_autocommit(connectionObject *self, int value) +{ + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&self->lock); + + self->autocommit = value; + + pthread_mutex_unlock(&self->lock); + Py_END_ALLOW_THREADS; + + return 0; +} + /* conn_switch_isolation_level - switch isolation level on the connection */ int @@ -959,33 +1036,80 @@ conn_switch_isolation_level(connectionObject *self, int level) { PGresult *pgres = NULL; char *error = NULL; - int res = 0; + int curr_level; + int ret = -1; + + /* use only supported levels on older PG versions */ + if (self->server_version < 80000) { + if (level == 1 || level == 3) { + ++level; + } + } + + if (-1 == (curr_level = conn_get_isolation_level(self))) { + return -1; + } + + if (curr_level == level) { + /* no need to change level */ + return 0; + } + + /* Emulate the previous semantic of set_isolation_level() using the + * functions currently available. */ Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&self->lock); - /* if the current isolation level is equal to the requested one don't switch */ - if (self->isolation_level != level) { + /* terminate the current transaction if any */ + if ((ret = pq_abort_locked(self, &pgres, &error, &_save))) { + goto endlock; + } - /* if the current isolation level is > 0 we need to abort the current - transaction before changing; that all folks! */ - if (self->isolation_level != ISOLATION_LEVEL_AUTOCOMMIT) { - res = pq_abort_locked(self, &pgres, &error, &_save); + if (level == 0) { + if ((ret = pq_set_guc_locked(self, + "default_transaction_isolation", "default", + &pgres, &error, &_save))) { + goto endlock; + } + self->autocommit = 1; + } + else { + /* find the name of the requested level */ + const IsolationLevel *isolevel = conn_isolevels; + while ((++isolevel)->name) { + if (level == isolevel->value) { + break; + } + } + if (!isolevel->name) { + ret = -1; + error = strdup("bad isolation level value"); + goto endlock; } - self->isolation_level = level; - Dprintf("conn_switch_isolation_level: switched to level %d", level); + if ((ret = pq_set_guc_locked(self, + "default_transaction_isolation", isolevel->name, + &pgres, &error, &_save))) { + goto endlock; + } + self->autocommit = 0; } + Dprintf("conn_switch_isolation_level: switched to level %d", level); + +endlock: pthread_mutex_unlock(&self->lock); Py_END_ALLOW_THREADS; - if (res < 0) + if (ret < 0) { pq_complete_error(self, &pgres, &error); + } - return res; + return ret; } + /* conn_set_client_encoding - switch client encoding on connection */ int @@ -993,7 +1117,6 @@ conn_set_client_encoding(connectionObject *self, const char *enc) { PGresult *pgres = NULL; char *error = NULL; - char query[48]; int res = 1; char *codec = NULL; char *clean_enc = NULL; @@ -1009,16 +1132,14 @@ conn_set_client_encoding(connectionObject *self, const char *enc) Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&self->lock); - /* set encoding, no encoding string is longer than 24 bytes */ - PyOS_snprintf(query, 47, "SET client_encoding = '%s'", clean_enc); - /* abort the current transaction, to set the encoding ouside of transactions */ if ((res = pq_abort_locked(self, &pgres, &error, &_save))) { goto endlock; } - if ((res = pq_execute_command_locked(self, query, &pgres, &error, &_save))) { + if ((res = pq_set_guc_locked(self, "client_encoding", clean_enc, + &pgres, &error, &_save))) { goto endlock; } @@ -1042,7 +1163,6 @@ conn_set_client_encoding(connectionObject *self, const char *enc) self->encoding, self->codec); endlock: - pthread_mutex_unlock(&self->lock); Py_END_ALLOW_THREADS; |