summaryrefslogtreecommitdiff
path: root/contrib/dblink
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2010-06-14 20:50:04 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2010-06-14 20:50:04 +0000
commitc83c22d05327805d4014088e70ae20d0284e7adf (patch)
tree10cbb0e0148c88194d692878c804180372a3f815 /contrib/dblink
parent2e5a61f919f0f26045f0e4baefdd7ac704ba4910 (diff)
downloadpostgresql-c83c22d05327805d4014088e70ae20d0284e7adf.tar.gz
Rearrange dblink's dblink_build_sql_insert() and related routines to open and
lock the target relation just once per SQL function call. The original coding obtained and released lock several times per call. Aside from saving a not-insignificant number of cycles, this eliminates possible race conditions if someone tries to modify the relation's schema concurrently. Also centralize locking and permission-checking logic. Problem noted while investigating a trouble report from Robert Voinea --- his problem is still to be fixed, though.
Diffstat (limited to 'contrib/dblink')
-rw-r--r--contrib/dblink/dblink.c208
1 files changed, 86 insertions, 122 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 4cc7b1e4a3..15d00d888c 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -52,6 +52,7 @@
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
+#include "utils/acl.h"
#include "utils/array.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
@@ -75,19 +76,19 @@ static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn * con);
static void deleteConnection(const char *name);
-static char **get_pkey_attnames(Oid relid, int16 *numatts);
-static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
-static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
-static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+static char **get_pkey_attnames(Relation rel, int16 *numatts);
+static char *get_sql_insert(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+static char *get_sql_delete(Relation rel, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
+static char *get_sql_update(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *quote_literal_cstr(char *rawstr);
static char *quote_ident_cstr(char *rawstr);
static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
-static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
-static Oid get_relid_from_relname(text *relname_text);
-static char *generate_relation_name(Oid relid);
+static HeapTuple get_tuple_of_interest(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
+static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
+static char *generate_relation_name(Relation rel);
static char *connstr_strip_password(const char *connstr);
static void dblink_security_check(PGconn *conn, remoteConn *rcon, const char *connstr);
-static int get_nondropped_natts(Oid relid);
+static int get_nondropped_natts(Relation rel);
/* Global */
List *res_id = NIL;
@@ -984,7 +985,6 @@ Datum
dblink_get_pkey(PG_FUNCTION_ARGS)
{
int16 numatts;
- Oid relid;
char **results;
FuncCallContext *funcctx;
int32 call_cntr;
@@ -995,7 +995,8 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
- TupleDesc tupdesc = NULL;
+ Relation rel;
+ TupleDesc tupdesc;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
@@ -1006,13 +1007,13 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
*/
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
- /* convert relname to rel Oid */
- relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
- if (!OidIsValid(relid))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_TABLE),
- errmsg("relation \"%s\" does not exist",
- GET_STR(PG_GETARG_TEXT_P(0)))));
+ /* open target relation */
+ rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
+
+ /* get the array of attnums */
+ results = get_pkey_attnames(rel, &numatts);
+
+ relation_close(rel, AccessShareLock);
/*
* need a tuple descriptor representing one INT and one TEXT
@@ -1031,9 +1032,6 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;
- /* get an array of attnums */
- results = get_pkey_attnames(relid, &numatts);
-
if ((results != NULL) && (numatts > 0))
{
funcctx->max_calls = numatts;
@@ -1115,7 +1113,7 @@ PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
Datum
dblink_build_sql_insert(PG_FUNCTION_ARGS)
{
- Oid relid;
+ Relation rel;
text *relname_text;
int16 *pkattnums;
int pknumatts_tmp;
@@ -1141,14 +1139,9 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
relname_text = PG_GETARG_TEXT_P(0);
/*
- * Convert relname to rel OID.
+ * Open target relation.
*/
- relid = get_relid_from_relname(relname_text);
- if (!OidIsValid(relid))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_TABLE),
- errmsg("relation \"%s\" does not exist",
- GET_STR(relname_text))));
+ rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts_tmp = PG_GETARG_INT32(2);
@@ -1175,7 +1168,7 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
* ensure we don't ask for more pk attributes than we have
* non-dropped columns
*/
- nondropped_natts = get_nondropped_natts(relid);
+ nondropped_natts = get_nondropped_natts(rel);
if (pknumatts > nondropped_natts)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("number of primary key fields exceeds number of specified relation attributes")));
@@ -1249,7 +1242,12 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
/*
* Prep work is finally done. Go get the SQL string.
*/
- sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+ sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+ /*
+ * Now we can close the relation.
+ */
+ relation_close(rel, AccessShareLock);
/*
* And send it
@@ -1277,7 +1275,7 @@ PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
Datum
dblink_build_sql_delete(PG_FUNCTION_ARGS)
{
- Oid relid;
+ Relation rel;
text *relname_text;
int16 *pkattnums;
int pknumatts_tmp;
@@ -1298,14 +1296,9 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
relname_text = PG_GETARG_TEXT_P(0);
/*
- * Convert relname to rel OID.
+ * Open target relation.
*/
- relid = get_relid_from_relname(relname_text);
- if (!OidIsValid(relid))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_TABLE),
- errmsg("relation \"%s\" does not exist",
- GET_STR(relname_text))));
+ rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts_tmp = PG_GETARG_INT32(2);
@@ -1331,7 +1324,7 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
* ensure we don't ask for more pk attributes than we have
* non-dropped columns
*/
- nondropped_natts = get_nondropped_natts(relid);
+ nondropped_natts = get_nondropped_natts(rel);
if (pknumatts > nondropped_natts)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("number of primary key fields exceeds number of specified relation attributes")));
@@ -1372,7 +1365,12 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
/*
* Prep work is finally done. Go get the SQL string.
*/
- sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
+ sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
+
+ /*
+ * Now we can close the relation.
+ */
+ relation_close(rel, AccessShareLock);
/*
* And send it
@@ -1404,7 +1402,7 @@ PG_FUNCTION_INFO_V1(dblink_build_sql_update);
Datum
dblink_build_sql_update(PG_FUNCTION_ARGS)
{
- Oid relid;
+ Relation rel;
text *relname_text;
int16 *pkattnums;
int pknumatts_tmp;
@@ -1430,14 +1428,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
relname_text = PG_GETARG_TEXT_P(0);
/*
- * Convert relname to rel OID.
+ * Open target relation.
*/
- relid = get_relid_from_relname(relname_text);
- if (!OidIsValid(relid))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_TABLE),
- errmsg("relation \"%s\" does not exist",
- GET_STR(relname_text))));
+ rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts_tmp = PG_GETARG_INT32(2);
@@ -1464,7 +1457,7 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
* ensure we don't ask for more pk attributes than we have
* non-dropped columns
*/
- nondropped_natts = get_nondropped_natts(relid);
+ nondropped_natts = get_nondropped_natts(rel);
if (pknumatts > nondropped_natts)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("number of primary key fields exceeds number of specified relation attributes")));
@@ -1538,7 +1531,12 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
/*
* Prep work is finally done. Go get the SQL string.
*/
- sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+ sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+ /*
+ * Now we can close the relation.
+ */
+ relation_close(rel, AccessShareLock);
/*
* And send it
@@ -1572,7 +1570,7 @@ dblink_current_query(PG_FUNCTION_ARGS)
* Return NULL, and set numatts = 0, if no primary key exists.
*/
static char **
-get_pkey_attnames(Oid relid, int16 *numatts)
+get_pkey_attnames(Relation rel, int16 *numatts)
{
Relation indexRelation;
ScanKeyData entry;
@@ -1580,22 +1578,19 @@ get_pkey_attnames(Oid relid, int16 *numatts)
HeapTuple indexTuple;
int i;
char **result = NULL;
- Relation rel;
TupleDesc tupdesc;
- /* open relation using relid, get tupdesc */
- rel = relation_open(relid, AccessShareLock);
- tupdesc = rel->rd_att;
-
/* initialize numatts to 0 in case no primary key exists */
*numatts = 0;
+ tupdesc = rel->rd_att;
+
/* use relid to get all related indexes */
indexRelation = heap_openr(IndexRelationName, AccessShareLock);
ScanKeyInit(&entry,
Anum_pg_index_indrelid,
BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(relid));
+ ObjectIdGetDatum(RelationGetRelid(rel)));
scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry);
while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
@@ -1621,15 +1616,13 @@ get_pkey_attnames(Oid relid, int16 *numatts)
}
heap_endscan(scan);
heap_close(indexRelation, AccessShareLock);
- relation_close(rel, AccessShareLock);
return result;
}
static char *
-get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+get_sql_insert(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{
- Relation rel;
char *relname;
HeapTuple tuple;
TupleDesc tupdesc;
@@ -1642,16 +1635,12 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
bool needComma;
/* get relation name including any needed schema prefix and quoting */
- relname = generate_relation_name(relid);
+ relname = generate_relation_name(rel);
- /*
- * Open relation using relid
- */
- rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att;
natts = tupdesc->natts;
- tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+ tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
if (!tuple)
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
@@ -1711,15 +1700,13 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
sql = pstrdup(str->data);
pfree(str->data);
pfree(str);
- relation_close(rel, AccessShareLock);
return (sql);
}
static char *
-get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
+get_sql_delete(Relation rel, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
{
- Relation rel;
char *relname;
TupleDesc tupdesc;
int natts;
@@ -1729,12 +1716,8 @@ get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattval
int i;
/* get relation name including any needed schema prefix and quoting */
- relname = generate_relation_name(relid);
+ relname = generate_relation_name(rel);
- /*
- * Open relation using relid
- */
- rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att;
natts = tupdesc->natts;
@@ -1767,15 +1750,13 @@ get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattval
sql = pstrdup(str->data);
pfree(str->data);
pfree(str);
- relation_close(rel, AccessShareLock);
return (sql);
}
static char *
-get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+get_sql_update(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{
- Relation rel;
char *relname;
HeapTuple tuple;
TupleDesc tupdesc;
@@ -1788,16 +1769,12 @@ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
bool needComma;
/* get relation name including any needed schema prefix and quoting */
- relname = generate_relation_name(relid);
+ relname = generate_relation_name(rel);
- /*
- * Open relation using relid
- */
- rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att;
natts = tupdesc->natts;
- tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+ tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
if (!tuple)
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
@@ -1866,7 +1843,6 @@ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
sql = pstrdup(str->data);
pfree(str->data);
pfree(str);
- relation_close(rel, AccessShareLock);
return (sql);
}
@@ -1923,9 +1899,8 @@ get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
}
static HeapTuple
-get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
+get_tuple_of_interest(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
{
- Relation rel;
char *relname;
TupleDesc tupdesc;
StringInfo str = makeStringInfo();
@@ -1936,14 +1911,9 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p
char *val = NULL;
/* get relation name including any needed schema prefix and quoting */
- relname = generate_relation_name(relid);
+ relname = generate_relation_name(rel);
- /*
- * Open relation using relid
- */
- rel = relation_open(relid, AccessShareLock);
- tupdesc = CreateTupleDescCopy(rel->rd_att);
- relation_close(rel, AccessShareLock);
+ tupdesc = rel->rd_att;
/*
* Connect to SPI manager
@@ -2021,52 +1991,49 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p
return NULL;
}
-static Oid
-get_relid_from_relname(text *relname_text)
+/*
+ * Open the relation named by relname_text, acquire specified type of lock,
+ * verify we have specified permissions.
+ * Caller must close rel when done with it.
+ */
+static Relation
+get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
{
RangeVar *relvar;
Relation rel;
- Oid relid;
+ AclResult aclresult;
- relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text, "get_relid_from_relname"));
- rel = heap_openrv(relvar, AccessShareLock);
- relid = RelationGetRelid(rel);
- relation_close(rel, AccessShareLock);
+ relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text, "get_rel_from_relname"));
+ rel = heap_openrv(relvar, lockmode);
- return relid;
+ aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
+ aclmode);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_CLASS,
+ RelationGetRelationName(rel));
+
+ return rel;
}
/*
* generate_relation_name - copied from ruleutils.c
- * Compute the name to display for a relation specified by OID
+ * Compute the name to display for a relation
*
* The result includes all necessary quoting and schema-prefixing.
*/
static char *
-generate_relation_name(Oid relid)
+generate_relation_name(Relation rel)
{
- HeapTuple tp;
- Form_pg_class reltup;
char *nspname;
char *result;
- tp = SearchSysCache(RELOID,
- ObjectIdGetDatum(relid),
- 0, 0, 0);
- if (!HeapTupleIsValid(tp))
- elog(ERROR, "cache lookup failed for relation %u", relid);
-
- reltup = (Form_pg_class) GETSTRUCT(tp);
-
/* Qualify the name if not visible in search path */
- if (RelationIsVisible(relid))
+ if (RelationIsVisible(RelationGetRelid(rel)))
nspname = NULL;
else
- nspname = get_namespace_name(reltup->relnamespace);
+ nspname = get_namespace_name(rel->rd_rel->relnamespace);
- result = quote_qualified_identifier(nspname, NameStr(reltup->relname));
-
- ReleaseSysCache(tp);
+ result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
return result;
}
@@ -2315,15 +2282,13 @@ dblink_security_check(PGconn *conn, remoteConn *rcon, const char *connstr)
}
static int
-get_nondropped_natts(Oid relid)
+get_nondropped_natts(Relation rel)
{
int nondropped_natts = 0;
TupleDesc tupdesc;
- Relation rel;
int natts;
int i;
- rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att;
natts = tupdesc->natts;
@@ -2334,6 +2299,5 @@ get_nondropped_natts(Oid relid)
nondropped_natts++;
}
- relation_close(rel, AccessShareLock);
return nondropped_natts;
}