summaryrefslogtreecommitdiff
path: root/psycopg/connection_int.c
diff options
context:
space:
mode:
Diffstat (limited to 'psycopg/connection_int.c')
-rw-r--r--psycopg/connection_int.c290
1 files changed, 205 insertions, 85 deletions
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index 22c5bc5..441d362 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -34,6 +34,19 @@
#include <string.h>
+/* Mapping from isolation level name to value exposed by Python.
+ * Only used for backward compatibility by the isolation_level property */
+
+const IsolationLevel conn_isolevels[] = {
+ {"", 0}, /* autocommit */
+ {"read uncommitted", 1},
+ {"read committed", 2},
+ {"repeatable read", 3},
+ {"serializable", 4},
+ {"default", -1},
+ { NULL }
+};
+
/* Return a new "string" from a char* from the database.
*
@@ -82,6 +95,10 @@ conn_notice_callback(void *args, const char *message)
self->notice_pending = notice;
}
+/* Expose the notices received as Python objects.
+ *
+ * The function should be called with the connection lock and the GIL.
+ */
void
conn_notice_process(connectionObject *self)
{
@@ -92,10 +109,6 @@ conn_notice_process(connectionObject *self)
return;
}
- Py_BEGIN_ALLOW_THREADS;
- pthread_mutex_lock(&self->lock);
- Py_BLOCK_THREADS;
-
notice = self->notice_pending;
nnotices = PyList_GET_SIZE(self->notice_list);
@@ -119,10 +132,6 @@ conn_notice_process(connectionObject *self)
0, nnotices - CONN_NOTICES_LIMIT);
}
- Py_UNBLOCK_THREADS;
- pthread_mutex_unlock(&self->lock);
- Py_END_ALLOW_THREADS;
-
conn_notice_clean(self);
}
@@ -130,8 +139,6 @@ void
conn_notice_clean(connectionObject *self)
{
struct connectionObject_notice *tmp, *notice;
- Py_BEGIN_ALLOW_THREADS;
- pthread_mutex_lock(&self->lock);
notice = self->notice_pending;
@@ -141,11 +148,8 @@ conn_notice_clean(connectionObject *self)
free((void*)tmp->message);
free(tmp);
}
-
+
self->notice_pending = NULL;
-
- pthread_mutex_unlock(&self->lock);
- Py_END_ALLOW_THREADS;
}
@@ -161,8 +165,6 @@ conn_notifies_process(connectionObject *self)
PyObject *notify = NULL;
PyObject *pid = NULL, *channel = NULL, *payload = NULL;
- /* TODO: we are called without the lock! */
-
while ((pgn = PQnotifies(self->pgconn)) != NULL) {
Dprintf("conn_notifies_process: got NOTIFY from pid %d, msg = %s",
@@ -358,26 +360,57 @@ exit:
return rv;
}
+
int
-conn_get_isolation_level(PGresult *pgres)
+conn_get_isolation_level(connectionObject *self)
{
- static const char lvl1a[] = "read uncommitted";
- static const char lvl1b[] = "read committed";
- int rv;
+ PGresult *pgres = NULL;
+ char *error = NULL;
+ int rv = -1;
+ char *lname;
+ const IsolationLevel *level;
+
+ /* this may get called by async connections too: here's your result */
+ if (self->autocommit) {
+ return 0;
+ }
+
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&self->lock);
- char *isolation_level = PQgetvalue(pgres, 0, 0);
+ if (!(lname = pq_get_guc_locked(self, "default_transaction_isolation",
+ &pgres, &error, &_save))) {
+ goto endlock;
+ }
- if ((strcmp(lvl1b, isolation_level) == 0) /* most likely */
- || (strcmp(lvl1a, isolation_level) == 0))
- rv = ISOLATION_LEVEL_READ_COMMITTED;
- else /* if it's not one of the lower ones, it's SERIALIZABLE */
- rv = ISOLATION_LEVEL_SERIALIZABLE;
+ /* find the value for the requested isolation level */
+ level = conn_isolevels;
+ while ((++level)->name) {
+ if (0 == strcasecmp(level->name, lname)) {
+ rv = level->value;
+ break;
+ }
+ }
+ if (-1 == rv) {
+ error = malloc(256);
+ PyOS_snprintf(error, 256,
+ "unexpected isolation level: '%s'", lname);
+ }
- CLEARPGRES(pgres);
+ free(lname);
+
+endlock:
+ pthread_mutex_unlock(&self->lock);
+ Py_END_ALLOW_THREADS;
+
+ if (rv < 0) {
+ pq_complete_error(self, &pgres, &error);
+ }
return rv;
}
+
int
conn_get_protocol_version(PGconn *pgconn)
{
@@ -425,8 +458,8 @@ conn_is_datestyle_ok(PGconn *pgconn)
int
conn_setup(connectionObject *self, PGconn *pgconn)
{
- PGresult *pgres;
- int green;
+ PGresult *pgres = NULL;
+ char *error = NULL;
self->equote = conn_get_standard_conforming_strings(pgconn);
self->server_version = conn_get_server_version(pgconn);
@@ -450,51 +483,24 @@ conn_setup(connectionObject *self, PGconn *pgconn)
pthread_mutex_lock(&self->lock);
Py_BLOCK_THREADS;
- green = psyco_green();
-
- if (green && (pq_set_non_blocking(self, 1, 1) != 0)) {
+ if (psyco_green() && (pq_set_non_blocking(self, 1, 1) != 0)) {
return -1;
}
if (!conn_is_datestyle_ok(self->pgconn)) {
- if (!green) {
- Py_UNBLOCK_THREADS;
- Dprintf("conn_connect: exec query \"%s\";", psyco_datestyle);
- pgres = PQexec(pgconn, psyco_datestyle);
- Py_BLOCK_THREADS;
- } else {
- pgres = psyco_exec_green(self, psyco_datestyle);
- }
-
- if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) {
- PyErr_SetString(OperationalError, "can't set datestyle to ISO");
- IFCLEARPGRES(pgres);
- Py_UNBLOCK_THREADS;
- pthread_mutex_unlock(&self->lock);
- Py_BLOCK_THREADS;
- return -1;
- }
- CLEARPGRES(pgres);
- }
-
- if (!green) {
+ int res;
Py_UNBLOCK_THREADS;
- pgres = PQexec(pgconn, psyco_transaction_isolation);
+ res = pq_set_guc_locked(self, "datestyle", "ISO",
+ &pgres, &error, &_save);
Py_BLOCK_THREADS;
- } else {
- pgres = psyco_exec_green(self, psyco_transaction_isolation);
+ if (res < 0) {
+ pq_complete_error(self, &pgres, &error);
+ return -1;
+ }
}
- if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) {
- PyErr_SetString(OperationalError,
- "can't fetch default_isolation_level");
- IFCLEARPGRES(pgres);
- Py_UNBLOCK_THREADS;
- pthread_mutex_unlock(&self->lock);
- Py_BLOCK_THREADS;
- return -1;
- }
- self->isolation_level = conn_get_isolation_level(pgres);
+ /* for reset */
+ self->autocommit = 0;
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
@@ -779,7 +785,7 @@ _conn_poll_setup_async(connectionObject *self)
* expected to manage the transactions himself, by sending
* (asynchronously) BEGIN and COMMIT statements.
*/
- self->isolation_level = ISOLATION_LEVEL_AUTOCOMMIT;
+ self->autocommit = 1;
/* If the datestyle is ISO or anything else good,
* we can skip the CONN_STATUS_DATESTYLE step. */
@@ -857,7 +863,7 @@ conn_poll(connectionObject *self)
case CONN_STATUS_PREPARED:
res = _conn_poll_query(self);
- if (res == PSYCO_POLL_OK && self->async_cursor) {
+ if (res == PSYCO_POLL_OK && self->async && self->async_cursor) {
/* An async query has just finished: parse the tuple in the
* target cursor. */
cursorObject *curs;
@@ -952,6 +958,77 @@ conn_rollback(connectionObject *self)
return res;
}
+int
+conn_set_session(connectionObject *self,
+ const char *isolevel, const char *readonly, const char *deferrable,
+ int autocommit)
+{
+ PGresult *pgres = NULL;
+ char *error = NULL;
+ int res = -1;
+
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&self->lock);
+
+ if (isolevel) {
+ Dprintf("conn_set_session: setting isolation to %s", isolevel);
+ if ((res = pq_set_guc_locked(self,
+ "default_transaction_isolation", isolevel,
+ &pgres, &error, &_save))) {
+ goto endlock;
+ }
+ }
+
+ if (readonly) {
+ Dprintf("conn_set_session: setting read only to %s", readonly);
+ if ((res = pq_set_guc_locked(self,
+ "default_transaction_read_only", readonly,
+ &pgres, &error, &_save))) {
+ goto endlock;
+ }
+ }
+
+ if (deferrable) {
+ Dprintf("conn_set_session: setting deferrable to %s", deferrable);
+ if ((res = pq_set_guc_locked(self,
+ "default_transaction_deferrable", deferrable,
+ &pgres, &error, &_save))) {
+ goto endlock;
+ }
+ }
+
+ if (self->autocommit != autocommit) {
+ Dprintf("conn_set_session: setting autocommit to %d", autocommit);
+ self->autocommit = autocommit;
+ }
+
+ res = 0;
+
+endlock:
+ pthread_mutex_unlock(&self->lock);
+ Py_END_ALLOW_THREADS;
+
+ if (res < 0) {
+ pq_complete_error(self, &pgres, &error);
+ }
+
+ return res;
+}
+
+int
+conn_set_autocommit(connectionObject *self, int value)
+{
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&self->lock);
+
+ self->autocommit = value;
+
+ pthread_mutex_unlock(&self->lock);
+ Py_END_ALLOW_THREADS;
+
+ return 0;
+}
+
/* conn_switch_isolation_level - switch isolation level on the connection */
int
@@ -959,33 +1036,80 @@ conn_switch_isolation_level(connectionObject *self, int level)
{
PGresult *pgres = NULL;
char *error = NULL;
- int res = 0;
+ int curr_level;
+ int ret = -1;
+
+ /* use only supported levels on older PG versions */
+ if (self->server_version < 80000) {
+ if (level == 1 || level == 3) {
+ ++level;
+ }
+ }
+
+ if (-1 == (curr_level = conn_get_isolation_level(self))) {
+ return -1;
+ }
+
+ if (curr_level == level) {
+ /* no need to change level */
+ return 0;
+ }
+
+ /* Emulate the previous semantic of set_isolation_level() using the
+ * functions currently available. */
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
- /* if the current isolation level is equal to the requested one don't switch */
- if (self->isolation_level != level) {
+ /* terminate the current transaction if any */
+ if ((ret = pq_abort_locked(self, &pgres, &error, &_save))) {
+ goto endlock;
+ }
- /* if the current isolation level is > 0 we need to abort the current
- transaction before changing; that all folks! */
- if (self->isolation_level != ISOLATION_LEVEL_AUTOCOMMIT) {
- res = pq_abort_locked(self, &pgres, &error, &_save);
+ if (level == 0) {
+ if ((ret = pq_set_guc_locked(self,
+ "default_transaction_isolation", "default",
+ &pgres, &error, &_save))) {
+ goto endlock;
+ }
+ self->autocommit = 1;
+ }
+ else {
+ /* find the name of the requested level */
+ const IsolationLevel *isolevel = conn_isolevels;
+ while ((++isolevel)->name) {
+ if (level == isolevel->value) {
+ break;
+ }
+ }
+ if (!isolevel->name) {
+ ret = -1;
+ error = strdup("bad isolation level value");
+ goto endlock;
}
- self->isolation_level = level;
- Dprintf("conn_switch_isolation_level: switched to level %d", level);
+ if ((ret = pq_set_guc_locked(self,
+ "default_transaction_isolation", isolevel->name,
+ &pgres, &error, &_save))) {
+ goto endlock;
+ }
+ self->autocommit = 0;
}
+ Dprintf("conn_switch_isolation_level: switched to level %d", level);
+
+endlock:
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
- if (res < 0)
+ if (ret < 0) {
pq_complete_error(self, &pgres, &error);
+ }
- return res;
+ return ret;
}
+
/* conn_set_client_encoding - switch client encoding on connection */
int
@@ -993,7 +1117,6 @@ conn_set_client_encoding(connectionObject *self, const char *enc)
{
PGresult *pgres = NULL;
char *error = NULL;
- char query[48];
int res = 1;
char *codec = NULL;
char *clean_enc = NULL;
@@ -1009,16 +1132,14 @@ conn_set_client_encoding(connectionObject *self, const char *enc)
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
- /* set encoding, no encoding string is longer than 24 bytes */
- PyOS_snprintf(query, 47, "SET client_encoding = '%s'", clean_enc);
-
/* abort the current transaction, to set the encoding ouside of
transactions */
if ((res = pq_abort_locked(self, &pgres, &error, &_save))) {
goto endlock;
}
- if ((res = pq_execute_command_locked(self, query, &pgres, &error, &_save))) {
+ if ((res = pq_set_guc_locked(self, "client_encoding", clean_enc,
+ &pgres, &error, &_save))) {
goto endlock;
}
@@ -1042,7 +1163,6 @@ conn_set_client_encoding(connectionObject *self, const char *enc)
self->encoding, self->codec);
endlock:
-
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;