diff options
author | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2023-04-12 19:29:21 +0200 |
---|---|---|
committer | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2023-04-12 19:29:21 +0200 |
commit | 9ce04b50e120275afbc03ca0b80839dde3da8308 (patch) | |
tree | c9b815953349fc5bf689799108a2966fec755bf7 /src/backend | |
parent | 8e82db97b0a474008d8212a63f34e449a8c50bcd (diff) | |
download | postgresql-9ce04b50e120275afbc03ca0b80839dde3da8308.tar.gz |
Revert "Catalog NOT NULL constraints" and fallout
This reverts commit e056c557aef4 and minor later fixes thereof.
There's a few problems in this new feature -- most notably regarding
pg_upgrade behavior, but others as well. This new feature is not in any
way critical on its own, so instead of scrambling to fix it we revert it
and try again in early 17 with these issues in mind.
Discussion: https://postgr.es/m/3801207.1681057430@sss.pgh.pa.us
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/heap.c | 491 | ||||
-rw-r--r-- | src/backend/catalog/pg_constraint.c | 97 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 1326 | ||||
-rw-r--r-- | src/backend/nodes/outfuncs.c | 4 | ||||
-rw-r--r-- | src/backend/nodes/readfuncs.c | 8 | ||||
-rw-r--r-- | src/backend/optimizer/util/plancat.c | 2 | ||||
-rw-r--r-- | src/backend/parser/gram.y | 13 | ||||
-rw-r--r-- | src/backend/parser/parse_utilcmd.c | 206 | ||||
-rw-r--r-- | src/backend/utils/adt/ruleutils.c | 14 |
9 files changed, 484 insertions, 1677 deletions
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index d9b1adfe12..2a0d82aedd 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -2161,57 +2161,6 @@ StoreRelCheck(Relation rel, const char *ccname, Node *expr, } /* - * Store a NOT NULL constraint for the given relation - * - * The OID of the new constraint is returned. - */ -static Oid -StoreRelNotNull(Relation rel, const char *nnname, AttrNumber attnum, - bool is_validated, bool is_local, int inhcount, - bool is_no_inherit) -{ - int16 attNos; - Oid constrOid; - - /* We only ever store one column per constraint */ - attNos = attnum; - - constrOid = - CreateConstraintEntry(nnname, - RelationGetNamespace(rel), - CONSTRAINT_NOTNULL, - false, - false, - is_validated, - InvalidOid, - RelationGetRelid(rel), - &attNos, - 1, - 1, - InvalidOid, /* not a domain constraint */ - InvalidOid, /* no associated index */ - InvalidOid, /* Foreign key fields */ - NULL, - NULL, - NULL, - NULL, - 0, - ' ', - ' ', - NULL, - 0, - ' ', - NULL, /* not an exclusion constraint */ - NULL, - NULL, - is_local, - inhcount, - is_no_inherit, - false); - return constrOid; -} - -/* * Store defaults and constraints (passed as a list of CookedConstraint). * * Each CookedConstraint struct is modified to store the new catalog tuple OID. @@ -2255,14 +2204,6 @@ StoreConstraints(Relation rel, List *cooked_constraints, bool is_internal) is_internal); numchecks++; break; - - case CONSTR_NOTNULL: - con->conoid = - StoreRelNotNull(rel, con->name, con->attnum, - !con->skip_validation, con->is_local, - con->inhcount, con->is_no_inherit); - break; - default: elog(ERROR, "unrecognized constraint type: %d", (int) con->contype); @@ -2318,7 +2259,6 @@ AddRelationNewConstraints(Relation rel, ParseNamespaceItem *nsitem; int numchecks; List *checknames; - List *nnnames; ListCell *cell; Node *expr; CookedConstraint *cooked; @@ -2404,179 +2344,130 @@ AddRelationNewConstraints(Relation rel, */ numchecks = numoldchecks; checknames = NIL; - nnnames = NIL; foreach(cell, newConstraints) { Constraint *cdef = (Constraint *) lfirst(cell); + char *ccname; Oid constrOid; - if (cdef->contype == CONSTR_CHECK) + if (cdef->contype != CONSTR_CHECK) + continue; + + if (cdef->raw_expr != NULL) { - char *ccname; + Assert(cdef->cooked_expr == NULL); /* - * XXX Should we detect the case with CHECK (foo IS NOT NULL) and - * handle it as a NOT NULL constraint? + * Transform raw parsetree to executable expression, and verify + * it's valid as a CHECK constraint. */ - - if (cdef->raw_expr != NULL) - { - Assert(cdef->cooked_expr == NULL); - - /* - * Transform raw parsetree to executable expression, and - * verify it's valid as a CHECK constraint. - */ - expr = cookConstraint(pstate, cdef->raw_expr, - RelationGetRelationName(rel)); - } - else - { - Assert(cdef->cooked_expr != NULL); - - /* - * Here, we assume the parser will only pass us valid CHECK - * expressions, so we do no particular checking. - */ - expr = stringToNode(cdef->cooked_expr); - } + expr = cookConstraint(pstate, cdef->raw_expr, + RelationGetRelationName(rel)); + } + else + { + Assert(cdef->cooked_expr != NULL); /* - * Check name uniqueness, or generate a name if none was given. + * Here, we assume the parser will only pass us valid CHECK + * expressions, so we do no particular checking. */ - if (cdef->conname != NULL) - { - ListCell *cell2; + expr = stringToNode(cdef->cooked_expr); + } - ccname = cdef->conname; - /* Check against other new constraints */ - /* Needed because we don't do CommandCounterIncrement in loop */ - foreach(cell2, checknames) - { - if (strcmp((char *) lfirst(cell2), ccname) == 0) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("check constraint \"%s\" already exists", - ccname))); - } + /* + * Check name uniqueness, or generate a name if none was given. + */ + if (cdef->conname != NULL) + { + ListCell *cell2; - /* save name for future checks */ - checknames = lappend(checknames, ccname); - - /* - * Check against pre-existing constraints. If we are allowed - * to merge with an existing constraint, there's no more to do - * here. (We omit the duplicate constraint from the result, - * which is what ATAddCheckConstraint wants.) - */ - if (MergeWithExistingConstraint(rel, ccname, expr, - allow_merge, is_local, - cdef->initially_valid, - cdef->is_no_inherit)) - continue; - } - else + ccname = cdef->conname; + /* Check against other new constraints */ + /* Needed because we don't do CommandCounterIncrement in loop */ + foreach(cell2, checknames) { - /* - * When generating a name, we want to create "tab_col_check" - * for a column constraint and "tab_check" for a table - * constraint. We no longer have any info about the syntactic - * positioning of the constraint phrase, so we approximate - * this by seeing whether the expression references more than - * one column. (If the user played by the rules, the result - * is the same...) - * - * Note: pull_var_clause() doesn't descend into sublinks, but - * we eliminated those above; and anyway this only needs to be - * an approximate answer. - */ - List *vars; - char *colname; - - vars = pull_var_clause(expr, 0); - - /* eliminate duplicates */ - vars = list_union(NIL, vars); - - if (list_length(vars) == 1) - colname = get_attname(RelationGetRelid(rel), - ((Var *) linitial(vars))->varattno, - true); - else - colname = NULL; - - ccname = ChooseConstraintName(RelationGetRelationName(rel), - colname, - "check", - RelationGetNamespace(rel), - checknames); - - /* save name for future checks */ - checknames = lappend(checknames, ccname); + if (strcmp((char *) lfirst(cell2), ccname) == 0) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("check constraint \"%s\" already exists", + ccname))); } + /* save name for future checks */ + checknames = lappend(checknames, ccname); + /* - * OK, store it. + * Check against pre-existing constraints. If we are allowed to + * merge with an existing constraint, there's no more to do here. + * (We omit the duplicate constraint from the result, which is + * what ATAddCheckConstraint wants.) */ - constrOid = - StoreRelCheck(rel, ccname, expr, cdef->initially_valid, is_local, - is_local ? 0 : 1, cdef->is_no_inherit, is_internal); - - numchecks++; - - cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint)); - cooked->contype = CONSTR_CHECK; - cooked->conoid = constrOid; - cooked->name = ccname; - cooked->attnum = 0; - cooked->expr = expr; - cooked->skip_validation = cdef->skip_validation; - cooked->is_local = is_local; - cooked->inhcount = is_local ? 0 : 1; - cooked->is_no_inherit = cdef->is_no_inherit; - cookedConstraints = lappend(cookedConstraints, cooked); + if (MergeWithExistingConstraint(rel, ccname, expr, + allow_merge, is_local, + cdef->initially_valid, + cdef->is_no_inherit)) + continue; } - else if (cdef->contype == CONSTR_NOTNULL) + else { - CookedConstraint *nncooked; - AttrNumber colnum; - char *nnname; + /* + * When generating a name, we want to create "tab_col_check" for a + * column constraint and "tab_check" for a table constraint. We + * no longer have any info about the syntactic positioning of the + * constraint phrase, so we approximate this by seeing whether the + * expression references more than one column. (If the user + * played by the rules, the result is the same...) + * + * Note: pull_var_clause() doesn't descend into sublinks, but we + * eliminated those above; and anyway this only needs to be an + * approximate answer. + */ + List *vars; + char *colname; + + vars = pull_var_clause(expr, 0); - colnum = get_attnum(RelationGetRelid(rel), - cdef->colname); - if (colnum == InvalidAttrNumber) - elog(ERROR, "invalid column name \"%s\"", cdef->colname); + /* eliminate duplicates */ + vars = list_union(NIL, vars); - if (cdef->conname) - nnname = cdef->conname; /* verify clash? */ + if (list_length(vars) == 1) + colname = get_attname(RelationGetRelid(rel), + ((Var *) linitial(vars))->varattno, + true); else - nnname = ChooseConstraintName(RelationGetRelationName(rel), - cdef->colname, - "not_null", - RelationGetNamespace(rel), - nnnames); - nnnames = lappend(nnnames, nnname); - - constrOid = - StoreRelNotNull(rel, nnname, colnum, - cdef->initially_valid, - is_local, - is_local ? 0 : 1, - cdef->is_no_inherit); - - nncooked = (CookedConstraint *) palloc(sizeof(CookedConstraint)); - nncooked->contype = CONSTR_NOTNULL; - nncooked->conoid = constrOid; - nncooked->name = nnname; - nncooked->attnum = colnum; - nncooked->expr = NULL; - nncooked->skip_validation = cdef->skip_validation; - nncooked->is_local = is_local; - nncooked->inhcount = is_local ? 0 : 1; - nncooked->is_no_inherit = cdef->is_no_inherit; - - cookedConstraints = lappend(cookedConstraints, nncooked); + colname = NULL; + + ccname = ChooseConstraintName(RelationGetRelationName(rel), + colname, + "check", + RelationGetNamespace(rel), + checknames); + + /* save name for future checks */ + checknames = lappend(checknames, ccname); } + + /* + * OK, store it. + */ + constrOid = + StoreRelCheck(rel, ccname, expr, cdef->initially_valid, is_local, + is_local ? 0 : 1, cdef->is_no_inherit, is_internal); + + numchecks++; + + cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint)); + cooked->contype = CONSTR_CHECK; + cooked->conoid = constrOid; + cooked->name = ccname; + cooked->attnum = 0; + cooked->expr = expr; + cooked->skip_validation = cdef->skip_validation; + cooked->is_local = is_local; + cooked->inhcount = is_local ? 0 : 1; + cooked->is_no_inherit = cdef->is_no_inherit; + cookedConstraints = lappend(cookedConstraints, cooked); } /* @@ -2746,190 +2637,6 @@ MergeWithExistingConstraint(Relation rel, const char *ccname, Node *expr, return found; } -/* list_sort comparator to sort CookedConstraint by attnum */ -static int -list_cookedconstr_attnum_cmp(const ListCell *p1, const ListCell *p2) -{ - AttrNumber v1 = ((CookedConstraint *) lfirst(p1))->attnum; - AttrNumber v2 = ((CookedConstraint *) lfirst(p2))->attnum; - - if (v1 < v2) - return -1; - if (v1 > v2) - return 1; - return 0; -} - -/* - * Create the NOT NULL constraints for the relation - * - * These come from two sources: the 'constraints' list (of Constraint) is - * specified directly by the user; the 'old_notnulls' list (of - * CookedConstraint) comes from inheritance. We create one constraint - * for each column, giving priority to user-specified ones, and setting - * inhcount according to how many parents cause each column to get a - * NOT NULL constraint. If a user-specified name clashes with another - * user-specified name, an error is raised. - * - * Note that inherited constraints have two shapes: those coming from another - * NOT NULL constraint in the parent, which have a name already, and those - * coming from a PRIMARY KEY in the parent, which don't. Any name specified - * in a parent is disregarded in case of a conflict. - * - * Returns a list of AttrNumber for columns that need to have the attnotnull - * flag set. - */ -List * -AddRelationNotNullConstraints(Relation rel, List *constraints, - List *old_notnulls) -{ - List *nnnames = NIL; - List *givennames = NIL; - List *nncols = NIL; - ListCell *lc; - - /* - * First, create all NOT NULLs that are directly specified by the user. - * Note that inheritance might have given us another source for each, so - * we must scan the old_notnulls list and increment inhcount for each - * element with identical attnum. We delete from there any element that - * we process. - */ - foreach(lc, constraints) - { - Constraint *constr = lfirst_node(Constraint, lc); - AttrNumber attnum; - char *conname; - bool is_local = true; - int inhcount = 0; - ListCell *lc2; - - attnum = get_attnum(RelationGetRelid(rel), constr->colname); - - foreach(lc2, old_notnulls) - { - CookedConstraint *old = (CookedConstraint *) lfirst(lc2); - - if (old->attnum == attnum) - { - inhcount++; - old_notnulls = foreach_delete_current(old_notnulls, lc2); - } - } - - /* - * Determine a constraint name, which may have been specified by the - * user, or raise an error if a conflict exists with another - * user-specified name. - */ - if (constr->conname) - { - foreach(lc2, givennames) - { - if (strcmp(lfirst(lc2), conname) == 0) - ereport(ERROR, - errmsg("constraint name \"%s\" is already in use in relation \"%s\"", - constr->conname, - RelationGetRelationName(rel))); - } - - conname = constr->conname; - givennames = lappend(givennames, conname); - } - else - conname = ChooseConstraintName(RelationGetRelationName(rel), - get_attname(RelationGetRelid(rel), - attnum, false), - "not_null", - RelationGetNamespace(rel), - nnnames); - nnnames = lappend(nnnames, conname); - - StoreRelNotNull(rel, conname, - attnum, true, is_local, - inhcount, constr->is_no_inherit); - - nncols = lappend_int(nncols, attnum); - } - - /* - * If any column remains in the additional_notnulls list, we must create a - * NOT NULL constraint marked not-local. Because multiple parents could - * specify a NOT NULL for the same column, we must count how many there - * are and set inhcount accordingly, deleting elements we've already - * processed. - * - * We don't use foreach() here because we have two nested loops over the - * cooked constraint list, with possible element deletions in the inner one. - * If we used foreach_delete_current() it could only fix up the state of one - * of the loops, so it seems cleaner to use looping over list indexes for - * both loops. Note that any deletion will happen beyond where the outer - * loop is, so its index never needs adjustment. - */ - list_sort(old_notnulls, list_cookedconstr_attnum_cmp); - for (int outerpos = 0; outerpos < list_length(old_notnulls); outerpos++) - { - CookedConstraint *cooked; - char *conname = NULL; - int inhcount = 1; - ListCell *lc2; - - cooked = (CookedConstraint *) list_nth(old_notnulls, outerpos); - - /* We just preserve the first constraint name we come across, if any */ - if (conname == NULL && cooked->name) - conname = cooked->name; - - for (int restpos = outerpos + 1; restpos < list_length(old_notnulls);) - { - CookedConstraint *other; - - other = (CookedConstraint *) list_nth(old_notnulls, restpos); - if (other->attnum == cooked->attnum) - { - if (conname == NULL && other->name) - conname = other->name; - - inhcount++; - old_notnulls = list_delete_nth_cell(old_notnulls, restpos); - } - else - restpos++; - } - - /* If we got a name, make sure it isn't one we've already used */ - if (conname != NULL) - { - foreach(lc2, nnnames) - { - if (strcmp(lfirst(lc2), conname) == 0) - { - conname = NULL; - break; - } - } - } - - /* and choose a name, if needed */ - if (conname == NULL) - conname = ChooseConstraintName(RelationGetRelationName(rel), - get_attname(RelationGetRelid(rel), - cooked->attnum, false), - "not_null", - RelationGetNamespace(rel), - nnnames); - nnnames = lappend(nnnames, conname); - - StoreRelNotNull(rel, conname, cooked->attnum, true, - false, inhcount, - cooked->is_no_inherit); - - nncols = lappend_int(nncols, cooked->attnum); - } - - return nncols; -} - /* * Update the count of constraints in the relation's pg_class tuple. * diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index df73678cce..4002317f70 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -563,103 +563,6 @@ ChooseConstraintName(const char *name1, const char *name2, } /* - * Find and return the pg_constraint tuple that implements a validated - * NOT NULL constraint for the given column of the given relation. - * - * XXX This would be easier if we had pg_attribute.notnullconstr with the OID - * of the constraint that implements the NOT NULL constraint for that column. - * I'm not sure it's worth the catalog bloat and de-normalization, however. - */ -HeapTuple -findNotNullConstraintAttnum(Relation rel, AttrNumber attnum) -{ - Relation pg_constraint; - HeapTuple conTup, - retval = NULL; - SysScanDesc scan; - ScanKeyData key; - - pg_constraint = table_open(ConstraintRelationId, AccessShareLock); - ScanKeyInit(&key, - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(rel))); - scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, - true, NULL, 1, &key); - - while (HeapTupleIsValid(conTup = systable_getnext(scan))) - { - Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(conTup); - AttrNumber conkey; - - /* - * We're looking for a NOTNULL constraint that's marked validated, - * with the column we're looking for as the sole element in conkey. - */ - if (con->contype != CONSTRAINT_NOTNULL) - continue; - if (!con->convalidated) - continue; - - conkey = extractNotNullColumn(conTup); - if (conkey != attnum) - continue; - - /* Found it */ - retval = heap_copytuple(conTup); - break; - } - - systable_endscan(scan); - table_close(pg_constraint, AccessShareLock); - - return retval; -} - -/* - * Find and return the pg_constraint tuple that implements a validated - * NOT NULL constraint for the given column of the given relation. - */ -HeapTuple -findNotNullConstraint(Relation rel, const char *colname) -{ - AttrNumber attnum = get_attnum(RelationGetRelid(rel), colname); - - return findNotNullConstraintAttnum(rel, attnum); -} - -/* - * Given a pg_constraint tuple for a NOT NULL constraint, return the column - * number it is for. - */ -AttrNumber -extractNotNullColumn(HeapTuple constrTup) -{ - AttrNumber colnum; - Datum adatum; - ArrayType *arr; - - /* only tuples for CHECK constraints should be given */ - Assert(((Form_pg_constraint) GETSTRUCT(constrTup))->contype == CONSTRAINT_NOTNULL); - - adatum = SysCacheGetAttrNotNull(CONSTROID, constrTup, - Anum_pg_constraint_conkey); - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - if (ARR_NDIM(arr) != 1 || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID || - ARR_DIMS(arr)[0] != 1) - elog(ERROR, "conkey is not a 1-D smallint array"); - - memcpy(&colnum, ARR_DATA_PTR(arr), sizeof(AttrNumber)); - - if ((Pointer) arr != DatumGetPointer(adatum)) - pfree(arr); /* free de-toasted copy, if any */ - - return colnum; -} - -/* * Delete a single constraint record. */ void diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 03466e495a..d9bbeafd82 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -203,7 +203,7 @@ typedef struct AlteredTableInfo typedef struct NewConstraint { char *name; /* Constraint name, or NULL if none */ - ConstrType contype; /* CHECK, FOREIGN */ + ConstrType contype; /* CHECK or FOREIGN */ Oid refrelid; /* PK rel, if FOREIGN */ Oid refindid; /* OID of PK's index, if FOREIGN */ Oid conid; /* OID of pg_constraint entry, if FOREIGN */ @@ -350,8 +350,7 @@ static void truncate_check_activity(Relation rel); static void RangeVarCallbackForTruncate(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg); static List *MergeAttributes(List *schema, List *supers, char relpersistence, - bool is_partition, List **supconstr, - List **additional_notnulls); + bool is_partition, List **supconstr); static bool MergeCheckConstraint(List *constraints, char *name, Node *expr); static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel); static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel); @@ -432,16 +431,14 @@ static bool check_for_column_name_collision(Relation rel, const char *colname, bool if_not_exists); static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid); static void add_column_collation_dependency(Oid relid, int32 attnum, Oid collid); -static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, bool recurse, - LOCKMODE lockmode); -static void set_attnotnull(List **wqueue, Relation rel, - AttrNumber attnum, bool recurse, LOCKMODE lockmode); -static ObjectAddress ATExecSetNotNull(List **wqueue, Relation rel, - char *constrname, char *colName, - bool recurse, bool recursing, - List **readyRels, LOCKMODE lockmode); -static void ATExecSetAttNotNull(List **wqueue, Relation rel, - const char *colName, LOCKMODE lockmode); +static void ATPrepDropNotNull(Relation rel, bool recurse, bool recursing); +static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode); +static void ATPrepSetNotNull(List **wqueue, Relation rel, + AlterTableCmd *cmd, bool recurse, bool recursing, + LOCKMODE lockmode, + AlterTableUtilityContext *context); +static ObjectAddress ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName, LOCKMODE lockmode); static void ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel, const char *colName, LOCKMODE lockmode); static bool NotNullImpliedByRelConstraints(Relation rel, Form_pg_attribute attr); @@ -544,11 +541,6 @@ static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, bool missing_ok, LOCKMODE lockmode); -static ObjectAddress dropconstraint_internal(Relation rel, - HeapTuple constraintTup, DropBehavior behavior, - bool recurse, bool recursing, - bool missing_ok, List **readyRels, - LOCKMODE lockmode); static void ATPrepAlterColumnType(List **wqueue, AlteredTableInfo *tab, Relation rel, bool recurse, bool recursing, @@ -624,7 +616,7 @@ static void RemoveInheritance(Relation child_rel, Relation parent_rel, static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, AlterTableUtilityContext *context); -static void AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel); +static void AttachPartitionEnsureIndexes(Relation rel, Relation attachrel); static void QueuePartitionConstraintValidation(List **wqueue, Relation scanrel, List *partConstraint, bool validate_default); @@ -642,7 +634,6 @@ static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl); static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl); -static void verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partIdx); static List *GetParentedForeignKeyRefs(Relation partition); static void ATDetachCheckNoForeignKeyRefs(Relation partition); static char GetAttributeCompression(Oid atttypid, char *compression); @@ -680,10 +671,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, TupleDesc descriptor; List *inheritOids; List *old_constraints; - List *old_notnulls; List *rawDefaults; List *cookedDefaults; - List *nncols; Datum reloptions; ListCell *listptr; AttrNumber attnum; @@ -873,13 +862,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, MergeAttributes(stmt->tableElts, inheritOids, stmt->relation->relpersistence, stmt->partbound != NULL, - &old_constraints, &old_notnulls); + &old_constraints); /* * Create a tuple descriptor from the relation schema. Note that this - * deals with column names, types, and in-descriptor NOT NULL flags, but - * not default values, NOT NULL or CHECK constraints; we handle those - * below. + * deals with column names, types, and NOT NULL constraints, but not + * default values or CHECK constraints; we handle those below. */ descriptor = BuildDescForRelation(stmt->tableElts); @@ -1262,17 +1250,6 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, AddRelationNewConstraints(rel, NIL, stmt->constraints, true, true, false, queryString); - /* - * Finally, merge the NOT NULL constraints that are directly declared with - * those that come from parent relations (making sure to count inheritance - * appropriately for each), create them, and set the attnotnull flag on - * columns that don't yet have it. - */ - nncols = AddRelationNotNullConstraints(rel, stmt->nnconstraints, - old_notnulls); - foreach(listptr, nncols) - set_attnotnull(NULL, rel, lfirst_int(listptr), false, NoLock); - ObjectAddressSet(address, RelationRelationId, relationId); /* @@ -2321,8 +2298,6 @@ storage_name(char c) * Output arguments: * 'supconstr' receives a list of constraints belonging to the parents, * updated as necessary to be valid for the child. - * 'nnconstraints' receives a list of CookedConstraints that corresponds to - * constraints coming from inheritance parents. * * Return value: * Completed schema list. @@ -2353,10 +2328,7 @@ storage_name(char c) * * Constraints (including NOT NULL constraints) for the child table * are the union of all relevant constraints, from both the child schema - * and parent tables. In addition, in legacy inheritance, each column that - * appears in a primary key in any of the parents also gets a NOT NULL - * constraint (partitioning doesn't need this, because the PK itself gets - * inherited.) + * and parent tables. * * The default value for a child column is defined as: * (1) If the child schema specifies a default, that value is used. @@ -2375,11 +2347,10 @@ storage_name(char c) */ static List * MergeAttributes(List *schema, List *supers, char relpersistence, - bool is_partition, List **supconstr, List **supnotnulls) + bool is_partition, List **supconstr) { List *inhSchema = NIL; List *constraints = NIL; - List *nnconstraints = NIL; bool have_bogus_defaults = false; int child_attno; static Node bogus_marker = {0}; /* marks conflicting defaults */ @@ -2491,13 +2462,9 @@ MergeAttributes(List *schema, List *supers, char relpersistence, AttrMap *newattmap; List *inherited_defaults; List *cols_with_defaults; - List *nnconstrs; AttrNumber parent_attno; ListCell *lc1; ListCell *lc2; - Bitmapset *pkattrs; - Bitmapset *nncols = NULL; - /* caller already got lock */ relation = table_open(parent, NoLock); @@ -2586,20 +2553,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence, /* We can't process inherited defaults until newattmap is complete. */ inherited_defaults = cols_with_defaults = NIL; - /* - * All columns that are part of the parent's primary key need to be - * NOT NULL; if partition just the attnotnull bit, otherwise a full - * constraint (if they don't have one already). Also, we request - * attnotnull on columns that have a NOT NULL constraint that's not - * marked NO INHERIT. - */ - pkattrs = RelationGetIndexAttrBitmap(relation, - INDEX_ATTR_BITMAP_PRIMARY_KEY); - nnconstrs = RelationGetNotNullConstraints(relation, true); - foreach(lc1, nnconstrs) - nncols = bms_add_member(nncols, - ((CookedConstraint *) lfirst(lc1))->attnum); - for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) { @@ -2695,38 +2648,9 @@ MergeAttributes(List *schema, List *supers, char relpersistence, } /* - * In regular inheritance, columns in the parent's primary key - * get an extra CHECK (NOT NULL) constraint. Partitioning - * doesn't need this, because the PK itself is going to be - * cloned to the partition. - */ - if (!is_partition && - bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber, - pkattrs)) - { - CookedConstraint *nn; - - nn = palloc(sizeof(CookedConstraint)); - nn->contype = CONSTR_NOTNULL; - nn->conoid = InvalidOid; - nn->name = NULL; - nn->attnum = exist_attno; - nn->expr = NULL; - nn->skip_validation = false; - nn->is_local = false; - nn->inhcount = 1; - nn->is_no_inherit = false; - - nnconstraints = lappend(nnconstraints, nn); - } - - /* - * mark attnotnull if parent has it and it's not NO INHERIT + * Merge of NOT NULL constraints = OR 'em together */ - if (bms_is_member(parent_attno, nncols) || - bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber, - pkattrs)) - def->is_not_null = true; + def->is_not_null |= attribute->attnotnull; /* * Check for GENERATED conflicts @@ -2760,11 +2684,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, attribute->atttypmod); def->inhcount = 1; def->is_local = false; - /* mark attnotnull if parent has it and it's not NO INHERIT */ - if (bms_is_member(parent_attno, nncols) || - bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber, - pkattrs)) - def->is_not_null = true; + def->is_not_null = attribute->attnotnull; def->is_from_type = false; def->storage = attribute->attstorage; def->raw_default = NULL; @@ -2781,33 +2701,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence, def->compression = NULL; inhSchema = lappend(inhSchema, def); newattmap->attnums[parent_attno - 1] = ++child_attno; - - /* - * In regular inheritance, columns in the parent's primary key - * get an extra NOT NULL constraint. Partitioning doesn't - * need this, because the PK itself is going to be cloned to - * the partition. - */ - if (!is_partition && - bms_is_member(parent_attno - - FirstLowInvalidHeapAttributeNumber, - pkattrs)) - { - CookedConstraint *nn; - - nn = palloc(sizeof(CookedConstraint)); - nn->contype = CONSTR_NOTNULL; - nn->conoid = InvalidOid; - nn->name = NULL; - nn->attnum = newattmap->attnums[parent_attno - 1]; - nn->expr = NULL; - nn->skip_validation = false; - nn->is_local = false; - nn->inhcount = 1; - nn->is_no_inherit = false; - - nnconstraints = lappend(nnconstraints, nn); - } } /* @@ -2952,19 +2845,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence, } } - /* - * Also copy the NOT NULL constraints from this parent. The - * attnotnull markings were already installed above. - */ - foreach(lc1, nnconstrs) - { - CookedConstraint *nn = lfirst(lc1); - - nn->attnum = newattmap->attnums[nn->attnum - 1]; - - nnconstraints = lappend(nnconstraints, nn); - } - free_attrmap(newattmap); /* @@ -3171,7 +3051,8 @@ MergeAttributes(List *schema, List *supers, char relpersistence, /* * Now that we have the column definition list for a partition, we can * check whether the columns referenced in the column constraint specs - * actually exist. + * actually exist. Also, we merge parent's NOT NULL constraints and + * defaults into each corresponding column definition. */ if (is_partition) { @@ -3277,8 +3158,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence, } *supconstr = constraints; - *supnotnulls = nnconstraints; - return schema; } @@ -3330,85 +3209,6 @@ MergeCheckConstraint(List *constraints, char *name, Node *expr) return false; } -/* - * RelationGetNotNullConstraints -- get list of NOT NULL constraints - * - * Caller can request cooked constraints, or raw. - * - * This is seldom needed, so we just scan pg_constraint each time. - * - * XXX This is only used to create derived tables, so NO INHERIT constraints - * are always skipped. - */ -List * -RelationGetNotNullConstraints(Relation relation, bool cooked) -{ - List *notnulls = NIL; - Relation constrRel; - HeapTuple htup; - SysScanDesc conscan; - ScanKeyData skey; - - constrRel = table_open(ConstraintRelationId, AccessShareLock); - ScanKeyInit(&skey, - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(relation))); - conscan = systable_beginscan(constrRel, ConstraintRelidTypidNameIndexId, true, - NULL, 1, &skey); - - while (HeapTupleIsValid(htup = systable_getnext(conscan))) - { - Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(htup); - AttrNumber colnum; - - if (conForm->contype != CONSTRAINT_NOTNULL) - continue; - if (conForm->connoinherit) - continue; - - colnum = extractNotNullColumn(htup); - - if (cooked) - { - CookedConstraint *cooked; - - cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint)); - - cooked->contype = CONSTR_NOTNULL; - cooked->name = pstrdup(NameStr(conForm->conname)); - cooked->attnum = colnum; - cooked->expr = NULL; - cooked->skip_validation = false; - cooked->is_local = true; - cooked->inhcount = 0; - cooked->is_no_inherit = conForm->connoinherit; - - notnulls = lappend(notnulls, cooked); - } - else - { - Constraint *constr; - - constr = makeNode(Constraint); - constr->contype = CONSTR_NOTNULL; - constr->conname = pstrdup(NameStr(conForm->conname)); - constr->deferrable = false; - constr->initdeferred = false; - constr->location = -1; - constr->colname = get_attname(RelationGetRelid(relation), - colnum, false); - constr->skip_validation = false; - constr->initially_valid = true; - notnulls = lappend(notnulls, constr); - } - } - - systable_endscan(conscan); - table_close(constrRel, AccessShareLock); - - return notnulls; -} /* * StoreCatalogInheritance @@ -3969,10 +3769,7 @@ rename_constraint_internal(Oid myrelid, constraintOid); con = (Form_pg_constraint) GETSTRUCT(tuple); - if (myrelid && - (con->contype == CONSTRAINT_CHECK || - con->contype == CONSTRAINT_NOTNULL) && - !con->connoinherit) + if (myrelid && con->contype == CONSTRAINT_CHECK && !con->connoinherit) { if (recurse) { @@ -4557,7 +4354,6 @@ AlterTableGetLockLevel(List *cmds) case AT_AddIndexConstraint: case AT_ReplicaIdentity: case AT_SetNotNull: - case AT_SetAttNotNull: case AT_EnableRowSecurity: case AT_DisableRowSecurity: case AT_ForceRowSecurity: @@ -4856,23 +4652,15 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - /* Set up recursion for phase 2; no other prep needed */ - if (recurse) - cmd->recurse = true; + ATPrepDropNotNull(rel, recurse, recursing); + ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); pass = AT_PASS_DROP; break; case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); /* Need command-specific recursion decision */ - if (recurse) - cmd->recurse = true; - pass = AT_PASS_COL_ATTRS; - break; - case AT_SetAttNotNull: /* set pg_attribute.attnotnull without adding - * a constraint */ - ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - /* Need command-specific recursion decision */ - ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); + ATPrepSetNotNull(wqueue, rel, cmd, recurse, recursing, + lockmode, context); pass = AT_PASS_COL_ATTRS; break; case AT_CheckNotNull: /* check column is already marked NOT NULL */ @@ -5257,14 +5045,10 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, address = ATExecDropIdentity(rel, cmd->name, cmd->missing_ok, lockmode); break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ - address = ATExecDropNotNull(rel, cmd->name, cmd->recurse, lockmode); + address = ATExecDropNotNull(rel, cmd->name, lockmode); break; case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ - address = ATExecSetNotNull(wqueue, rel, NULL, cmd->name, - cmd->recurse, false, NULL, lockmode); - break; - case AT_SetAttNotNull: /* set pg_attribute.attnotnull */ - ATExecSetAttNotNull(wqueue, rel, cmd->name, lockmode); + address = ATExecSetNotNull(tab, rel, cmd->name, lockmode); break; case AT_CheckNotNull: /* check column is already marked NOT NULL */ ATExecCheckNotNull(tab, rel, cmd->name, lockmode); @@ -5603,8 +5387,11 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, */ switch (cmd2->subtype) { - case AT_SetAttNotNull: - ATSimpleRecursion(wqueue, rel, cmd2, recurse, lockmode, context); + case AT_SetNotNull: + /* Need command-specific recursion decision */ + ATPrepSetNotNull(wqueue, rel, cmd2, + recurse, false, + lockmode, context); pass = AT_PASS_COL_ATTRS; break; case AT_AddIndex: @@ -6280,7 +6067,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) RelationGetRelationName(oldrel)), errtableconstraint(oldrel, con->name))); break; - case CONSTR_NOTNULL: case CONSTR_FOREIGN: /* Nothing to do here */ break; @@ -6389,8 +6175,6 @@ alter_table_type_to_string(AlterTableType cmdtype) return "ALTER COLUMN ... DROP NOT NULL"; case AT_SetNotNull: return "ALTER COLUMN ... SET NOT NULL"; - case AT_SetAttNotNull: - return NULL; /* not real grammar */ case AT_DropExpression: return "ALTER COLUMN ... DROP EXPRESSION"; case AT_CheckNotNull: @@ -6990,7 +6774,8 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing, */ static ObjectAddress ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel, - AlterTableCmd **cmd, bool recurse, bool recursing, + AlterTableCmd **cmd, + bool recurse, bool recursing, LOCKMODE lockmode, int cur_pass, AlterTableUtilityContext *context) { @@ -7505,19 +7290,41 @@ add_column_collation_dependency(Oid relid, int32 attnum, Oid collid) /* * ALTER TABLE ALTER COLUMN DROP NOT NULL - * + */ + +static void +ATPrepDropNotNull(Relation rel, bool recurse, bool recursing) +{ + /* + * If the parent is a partitioned table, like check constraints, we do not + * support removing the NOT NULL while partitions exist. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDesc partdesc = RelationGetPartitionDesc(rel, true); + + Assert(partdesc != NULL); + if (partdesc->nparts > 0 && !recurse && !recursing) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot remove constraint from only the partitioned table when partitions exist"), + errhint("Do not specify the ONLY keyword."))); + } +} + +/* * Return the address of the modified column. If the column was already * nullable, InvalidObjectAddress is returned. */ static ObjectAddress -ATExecDropNotNull(Relation rel, const char *colName, bool recurse, - LOCKMODE lockmode) +ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) { HeapTuple tuple; - HeapTuple conTup; Form_pg_attribute attTup; AttrNumber attnum; Relation attr_rel; + List *indexoidlist; + ListCell *indexoidscan; ObjectAddress address; /* @@ -7533,15 +7340,6 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, colName, RelationGetRelationName(rel)))); attTup = (Form_pg_attribute) GETSTRUCT(tuple); attnum = attTup->attnum; - ObjectAddressSubSet(address, RelationRelationId, - RelationGetRelid(rel), attnum); - - /* If the column is already nullable there's nothing to do. */ - if (!attTup->attnotnull) - { - table_close(attr_rel, RowExclusiveLock); - return InvalidObjectAddress; - } /* Prevent them from altering a system attribute */ if (attnum <= 0) @@ -7557,43 +7355,68 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, colName, RelationGetRelationName(rel)))); /* - * It's not OK to remove a constraint only for the parent and leave it in - * the children, so disallow that. + * Check that the attribute is not in a primary key or in an index used as + * a replica identity. + * + * Note: we'll throw error even if the pkey index is not valid. */ - if (!recurse) + + /* Loop over all indexes on the relation */ + indexoidlist = RelationGetIndexList(rel); + + foreach(indexoidscan, indexoidlist) { - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - { - PartitionDesc partdesc; + Oid indexoid = lfirst_oid(indexoidscan); + HeapTuple indexTuple; + Form_pg_index indexStruct; + int i; - partdesc = RelationGetPartitionDesc(rel, true); + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) + elog(ERROR, "cache lookup failed for index %u", indexoid); + indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); - if (partdesc->nparts > 0) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot remove constraint from only the partitioned table when partitions exist"), - errhint("Do not specify the ONLY keyword.")); - } - else if (rel->rd_rel->relhassubclass && - find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL) + /* + * If the index is not a primary key or an index used as replica + * identity, skip the check. + */ + if (indexStruct->indisprimary || indexStruct->indisreplident) { - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("NOT NULL constraint on column \"%s\" must be removed in child tables too", - colName), - errhint("Do not specify the ONLY keyword.")); + /* + * Loop over each attribute in the primary key or the index used + * as replica identity and see if it matches the to-be-altered + * attribute. + */ + for (i = 0; i < indexStruct->indnkeyatts; i++) + { + if (indexStruct->indkey.values[i] == attnum) + { + if (indexStruct->indisprimary) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column \"%s\" is in a primary key", + colName))); + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column \"%s\" is in index used as replica identity", + colName))); + } + } } + + ReleaseSysCache(indexTuple); } - /* - * If rel is partition, shouldn't drop NOT NULL if parent has the same. - */ + list_free(indexoidlist); + + /* If rel is partition, shouldn't drop NOT NULL if parent has the same */ if (rel->rd_rel->relispartition) { - Oid parentId = get_partition_parent(RelationGetRelid(rel), false); - Relation parent = table_open(parentId, AccessShareLock); - TupleDesc tupDesc = RelationGetDescr(parent); - AttrNumber parent_attnum; + Oid parentId = get_partition_parent(RelationGetRelid(rel), false); + Relation parent = table_open(parentId, AccessShareLock); + TupleDesc tupDesc = RelationGetDescr(parent); + AttrNumber parent_attnum; parent_attnum = get_attnum(parentId, colName); if (TupleDescAttr(tupDesc, parent_attnum - 1)->attnotnull) @@ -7605,33 +7428,22 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, } /* - * Find the constraint that makes this column NOT NULL. + * Okay, actually perform the catalog change ... if needed */ - conTup = findNotNullConstraint(rel, colName); - if (conTup == NULL) + if (attTup->attnotnull) { - Bitmapset *pkcols; + attTup->attnotnull = false; - /* - * There's no NOT NULL constraint, so throw an error. If the column - * is in a primary key, we can throw a specific error. Otherwise, - * this is unexpected. - */ - pkcols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_PRIMARY_KEY); - if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, - pkcols)) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column \"%s\" is in a primary key", colName)); + CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); - /* this shouldn't happen */ - elog(ERROR, "no NOT NULL constraint found to drop"); + ObjectAddressSubSet(address, RelationRelationId, + RelationGetRelid(rel), attnum); } + else + address = InvalidObjectAddress; - dropconstraint_internal(rel, conTup, DROP_RESTRICT, recurse, false, - false, NULL, lockmode); - - heap_freetuple(conTup); + InvokeObjectPostAlterHook(RelationRelationId, + RelationGetRelid(rel), attnum); table_close(attr_rel, RowExclusiveLock); @@ -7639,126 +7451,102 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, } /* - * Helper to set pg_attribute.attnotnull if it isn't set, and to tell phase 3 - * to verify it; recurses to apply the same to children. - * - * When called to alter an existing table, 'wqueue' must be given so that we can - * queue a check that existing tuples pass the constraint. When called from - * table creation, 'wqueue' should be passed as NULL. + * ALTER TABLE ALTER COLUMN SET NOT NULL */ + static void -set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, bool recurse, - LOCKMODE lockmode) +ATPrepSetNotNull(List **wqueue, Relation rel, + AlterTableCmd *cmd, bool recurse, bool recursing, + LOCKMODE lockmode, AlterTableUtilityContext *context) { - HeapTuple tuple; - Form_pg_attribute attForm; - List *children; - ListCell *lc; + /* + * If we're already recursing, there's nothing to do; the topmost + * invocation of ATSimpleRecursion already visited all children. + */ + if (recursing) + return; - tuple = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for attribute %d of relation %u", - attnum, RelationGetRelid(rel)); - attForm = (Form_pg_attribute) GETSTRUCT(tuple); - if (!attForm->attnotnull) + /* + * If the target column is already marked NOT NULL, we can skip recursing + * to children, because their columns should already be marked NOT NULL as + * well. But there's no point in checking here unless the relation has + * some children; else we can just wait till execution to check. (If it + * does have children, however, this can save taking per-child locks + * unnecessarily. This greatly improves concurrency in some parallel + * restore scenarios.) + * + * Unfortunately, we can only apply this optimization to partitioned + * tables, because traditional inheritance doesn't enforce that child + * columns be NOT NULL when their parent is. (That's a bug that should + * get fixed someday.) + */ + if (rel->rd_rel->relhassubclass && + rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { - Relation attr_rel; - - attr_rel = table_open(AttributeRelationId, RowExclusiveLock); - - attForm->attnotnull = true; - CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); + HeapTuple tuple; + bool attnotnull; - table_close(attr_rel, RowExclusiveLock); + tuple = SearchSysCacheAttName(RelationGetRelid(rel), cmd->name); - /* - * And set up for existing values to be checked, unless another constraint - * already proves this. - */ - if (wqueue && !NotNullImpliedByRelConstraints(rel, attForm)) - { - AlteredTableInfo *tab; + /* Might as well throw the error now, if name is bad */ + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + cmd->name, RelationGetRelationName(rel)))); - tab = ATGetQueueEntry(wqueue, rel); - tab->verify_new_notnull = true; - } + attnotnull = ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull; + ReleaseSysCache(tuple); + if (attnotnull) + return; } - /* if no recursion is desired, we're done */ - if (!recurse) - return; - - children = find_inheritance_children(RelationGetRelid(rel), lockmode); - foreach(lc, children) + /* + * If we have ALTER TABLE ONLY ... SET NOT NULL on a partitioned table, + * apply ALTER TABLE ... CHECK NOT NULL to every child. Otherwise, use + * normal recursion logic. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + !recurse) { - Oid childrelid = lfirst_oid(lc); - Relation childrel; - AttrNumber childattno; - - /* find_inheritance_children already got lock */ - childrel = table_open(childrelid, NoLock); - CheckTableNotInUse(childrel, "ALTER TABLE"); + AlterTableCmd *newcmd = makeNode(AlterTableCmd); - childattno = get_attnum(RelationGetRelid(childrel), - get_attname(RelationGetRelid(rel), attnum, - false)); - set_attnotnull(wqueue, childrel, childattno, - recurse, lockmode); - table_close(childrel, NoLock); + newcmd->subtype = AT_CheckNotNull; + newcmd->name = pstrdup(cmd->name); + ATSimpleRecursion(wqueue, rel, newcmd, true, lockmode, context); } + else + ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); } /* - * ALTER TABLE ALTER COLUMN SET NOT NULL - * - * Add a NOT NULL constraint to a single table and its children. Returns - * the address of the constraint added to the parent relation, if one gets - * added, or InvalidObjectAddress otherwise. - * - * We must recurse to child tables during execution, rather than using - * ALTER TABLE's normal prep-time recursion. + * Return the address of the modified column. If the column was already NOT + * NULL, InvalidObjectAddress is returned. */ static ObjectAddress -ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName, - bool recurse, bool recursing, List **readyRels, - LOCKMODE lockmode) +ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName, LOCKMODE lockmode) { HeapTuple tuple; - Relation constr_rel; - ScanKeyData skey; - SysScanDesc conscan; AttrNumber attnum; + Relation attr_rel; ObjectAddress address; - Constraint *constraint; - CookedConstraint *ccon; - List *cooked; - List *ready = NIL; /* - * In cases of multiple inheritance, we might visit the same child more - * than once. In the topmost call, set up a list that we fill with all - * visited relations, to skip those. + * lookup the attribute */ - if (readyRels == NULL) - readyRels = &ready; - if (list_member_oid(*readyRels, RelationGetRelid(rel))) - return InvalidObjectAddress; - *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel)); + attr_rel = table_open(AttributeRelationId, RowExclusiveLock); - /* At top level, permission check was done in ATPrepCmd, else do it */ - if (recursing) - { - ATSimplePermissions(AT_AddConstraint, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - Assert(conName != NULL); - } + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); - attnum = get_attnum(RelationGetRelid(rel), colName); - if (attnum == InvalidAttrNumber) + if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" of relation \"%s\" does not exist", colName, RelationGetRelationName(rel)))); + attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum; + /* Prevent them from altering a system attribute */ if (attnum <= 0) ereport(ERROR, @@ -7766,168 +7554,43 @@ ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName, errmsg("cannot alter system column \"%s\"", colName))); - /* See if there's already a constraint */ - constr_rel = table_open(ConstraintRelationId, RowExclusiveLock); - ScanKeyInit(&skey, - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(rel))); - conscan = systable_beginscan(constr_rel, ConstraintRelidTypidNameIndexId, true, - NULL, 1, &skey); - - while (HeapTupleIsValid(tuple = systable_getnext(conscan))) + /* + * Okay, actually perform the catalog change ... if needed + */ + if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull) { - Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(tuple); - bool changed = false; - HeapTuple copytup; + ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = true; - if (conForm->contype != CONSTRAINT_NOTNULL) - continue; - - if (extractNotNullColumn(tuple) != attnum) - continue; - - copytup = heap_copytuple(tuple); - conForm = (Form_pg_constraint) GETSTRUCT(copytup); + CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); /* - * If we find an appropriate constraint, we're almost done, but just - * need to change some properties on it: if we're recursing, increment - * coninhcount; if not, set conislocal if not already set. + * Ordinarily phase 3 must ensure that no NULLs exist in columns that + * are set NOT NULL; however, if we can find a constraint which proves + * this then we can skip that. We needn't bother looking if we've + * already found that we must verify some other NOT NULL constraint. */ - if (recursing) - { - conForm->coninhcount++; - changed = true; - } - else if (!conForm->conislocal) + if (!tab->verify_new_notnull && + !NotNullImpliedByRelConstraints(rel, (Form_pg_attribute) GETSTRUCT(tuple))) { - conForm->conislocal = true; - changed = true; - } - - if (changed) - { - CatalogTupleUpdate(constr_rel, ©tup->t_self, copytup); - ObjectAddressSet(address, ConstraintRelationId, conForm->oid); + /* Tell Phase 3 it needs to test the constraint */ + tab->verify_new_notnull = true; } - systable_endscan(conscan); - table_close(constr_rel, RowExclusiveLock); - - if (changed) - return address; - else - return InvalidObjectAddress; - } - - systable_endscan(conscan); - table_close(constr_rel, RowExclusiveLock); - - /* - * If we're asked not to recurse, and children exist, raise an error. - */ - if (!recurse && - find_inheritance_children(RelationGetRelid(rel), - NoLock) != NIL) - { - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot add constraint to only the partitioned table when partitions exist"), - errhint("Do not specify the ONLY keyword.")); - else - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot add constraint only to table with inheritance children"), - errhint("Do not specify the ONLY keyword.")); - } - - /* - * No constraint exists; we must add one. First determine a name to use, - * if we haven't already. - */ - if (!recursing) - { - Assert(conName == NULL); - conName = ChooseConstraintName(RelationGetRelationName(rel), - colName, "not_null", - RelationGetNamespace(rel), - NIL); + ObjectAddressSubSet(address, RelationRelationId, + RelationGetRelid(rel), attnum); } - constraint = makeNode(Constraint); - constraint->contype = CONSTR_NOTNULL; - constraint->conname = conName; - constraint->deferrable = false; - constraint->initdeferred = false; - constraint->location = -1; - constraint->colname = colName; - constraint->skip_validation = false; - constraint->initially_valid = true; - - /* and do it */ - cooked = AddRelationNewConstraints(rel, NIL, list_make1(constraint), - false, !recursing, false, NULL); - ccon = linitial(cooked); - ObjectAddressSet(address, ConstraintRelationId, ccon->conoid); - - /* - * Mark pg_attribute.attnotnull for the column. Tell that function not to - * recurse, because we're going to do it here. - */ - set_attnotnull(wqueue, rel, attnum, false, lockmode); - - /* - * Recurse to propagate the constraint to children that don't have one. - */ - if (recurse) - { - List *children; - ListCell *lc; - - children = find_inheritance_children(RelationGetRelid(rel), - lockmode); - - foreach(lc, children) - { - Relation childrel; - - childrel = table_open(lfirst_oid(lc), NoLock); + else + address = InvalidObjectAddress; - ATExecSetNotNull(wqueue, childrel, - conName, colName, recurse, true, - readyRels, lockmode); + InvokeObjectPostAlterHook(RelationRelationId, + RelationGetRelid(rel), attnum); - table_close(childrel, NoLock); - } - } + table_close(attr_rel, RowExclusiveLock); return address; } /* - * ALTER TABLE ALTER COLUMN SET ATTNOTNULL - * - * This doesn't exist in the grammar; it's used when creating a - * primary key and the column is not already marked attnotnull. - */ -static void -ATExecSetAttNotNull(List **wqueue, Relation rel, - const char *colName, LOCKMODE lockmode) -{ - AttrNumber attnum; - - attnum = get_attnum(RelationGetRelid(rel), colName); - if (attnum == InvalidAttrNumber) /* XXX should not happen .. elog? */ - ereport(ERROR, - errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" of relation \"%s\" does not exist", - colName, RelationGetRelationName(rel))); - - set_attnotnull(wqueue, rel, attnum, false, lockmode); -} - -/* * ALTER TABLE ALTER COLUMN CHECK NOT NULL * * This doesn't exist in the grammar, but we generate AT_CheckNotNull @@ -9209,14 +8872,13 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Assert(IsA(newConstraint, Constraint)); /* - * Currently, we only expect to see CONSTR_CHECK, CONSTR_NOTNULL and - * CONSTR_FOREIGN nodes arriving here (see the preprocessing done in - * parse_utilcmd.c). + * Currently, we only expect to see CONSTR_CHECK and CONSTR_FOREIGN nodes + * arriving here (see the preprocessing done in parse_utilcmd.c). Use a + * switch anyway to make it easier to add more code later. */ switch (newConstraint->contype) { case CONSTR_CHECK: - case CONSTR_NOTNULL: address = ATAddCheckConstraint(wqueue, tab, rel, newConstraint, recurse, false, is_readd, @@ -9301,9 +8963,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames) } /* - * Add a check or NOT NULL constraint to a single table and its children. - * Returns the address of the constraint added to the parent relation, - * if one gets added, or InvalidObjectAddress otherwise. + * Add a check constraint to a single table and its children. Returns the + * address of the constraint added to the parent relation, if one gets added, + * or InvalidObjectAddress otherwise. * * Subroutine for ATExecAddConstraint. * @@ -9356,7 +9018,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, { CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); - if (!ccon->skip_validation && ccon->contype != CONSTR_NOTNULL) + if (!ccon->skip_validation) { NewConstraint *newcon; @@ -9372,14 +9034,6 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, if (constr->conname == NULL) constr->conname = ccon->name; - /* - * If adding a NOT NULL constraint, set the pg_attribute flag and - * tell phase 3 to verify existing rows, if needed. - */ - if (constr->contype == CONSTR_NOTNULL) - set_attnotnull(wqueue, rel, ccon->attnum, - !ccon->is_no_inherit, lockmode); - ObjectAddressSet(address, ConstraintRelationId, ccon->conoid); } @@ -12304,11 +11958,16 @@ ATExecDropConstraint(Relation rel, const char *constrName, bool recurse, bool recursing, bool missing_ok, LOCKMODE lockmode) { + List *children; + ListCell *child; Relation conrel; + Form_pg_constraint con; SysScanDesc scan; ScanKeyData skey[3]; HeapTuple tuple; bool found = false; + bool is_no_inherit_constraint = false; + char contype; /* At top level, permission check was done in ATPrepCmd, else do it */ if (recursing) @@ -12337,258 +11996,80 @@ ATExecDropConstraint(Relation rel, const char *constrName, /* There can be at most one matching row */ if (HeapTupleIsValid(tuple = systable_getnext(scan))) { - dropconstraint_internal(rel, tuple, behavior, recurse, recursing, - missing_ok, NULL, lockmode); - found = true; - } + ObjectAddress conobj; - systable_endscan(scan); + con = (Form_pg_constraint) GETSTRUCT(tuple); - if (!found) - { - if (!missing_ok) + /* Don't drop inherited constraints */ + if (con->coninhcount > 0 && !recursing) ereport(ERROR, - errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" of relation \"%s\" does not exist", - constrName, RelationGetRelationName(rel))); - else - ereport(NOTICE, - errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping", - constrName, RelationGetRelationName(rel))); - } - - table_close(conrel, RowExclusiveLock); -} - -/* - * Remove a constraint, using its pg_constraint tuple - * - * Implementation for ALTER TABLE DROP CONSTRAINT and ALTER TABLE ALTER COLUMN - * DROP NOT NULL. - * - * Returns the address of the constraint being removed. - */ -static ObjectAddress -dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior behavior, - bool recurse, bool recursing, bool missing_ok, List **readyRels, - LOCKMODE lockmode) -{ - Relation conrel; - Form_pg_constraint con; - ObjectAddress conobj; - List *children; - ListCell *child; - bool is_no_inherit_constraint = false; - bool dropping_pk = false; - char *constrName; - List *unconstrained_cols = NIL; - char *colname; /* to match NOT NULL constraints when recursing */ - List *ready = NIL; - - if (readyRels == NULL) - readyRels = &ready; - if (list_member_oid(*readyRels, RelationGetRelid(rel))) - return InvalidObjectAddress; - *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel)); - - conrel = table_open(ConstraintRelationId, RowExclusiveLock); - - con = (Form_pg_constraint) GETSTRUCT(constraintTup); - constrName = NameStr(con->conname); - - /* - * If the constraint is marked conislocal and is also inherited, then we - * just set conislocal false and we're done. The constraint doesn't go - * away, and we don't modify any children. - */ - if (con->conislocal && con->coninhcount > 0) - { - HeapTuple copytup; - - /* make a copy we can scribble on */ - copytup = heap_copytuple(constraintTup); - con = (Form_pg_constraint) GETSTRUCT(copytup); - con->conislocal = false; - CatalogTupleUpdate(conrel, ©tup->t_self, copytup); - - table_close(conrel, RowExclusiveLock); - - ObjectAddressSet(conobj, ConstraintRelationId, con->oid); - return conobj; - } - - /* Don't drop inherited constraints */ - if (con->coninhcount > 0 && !recursing) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", - constrName, RelationGetRelationName(rel)))); - - /* - * See if we have a NOT NULL constraint or a PRIMARY KEY. If so, we have - * more checks and actions below, so obtain the list of columns that are - * constrained by the constraint being dropped. - */ - if (con->contype == CONSTRAINT_NOTNULL) - { - AttrNumber colnum = extractNotNullColumn(constraintTup); - - if (colnum != InvalidAttrNumber) - unconstrained_cols = list_make1_int(colnum); - } - else if (con->contype == CONSTRAINT_PRIMARY) - { - Datum adatum; - ArrayType *arr; - int numkeys; - bool isNull; - int16 *attnums; + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", + constrName, RelationGetRelationName(rel)))); - dropping_pk = true; + is_no_inherit_constraint = con->connoinherit; + contype = con->contype; - adatum = heap_getattr(constraintTup, Anum_pg_constraint_conkey, - RelationGetDescr(conrel), &isNull); - if (isNull) - elog(ERROR, "null conkey for constraint %u", con->oid); - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - numkeys = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - numkeys < 0 || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "conkey is not a 1-D smallint array"); - attnums = (int16 *) ARR_DATA_PTR(arr); + /* + * If it's a foreign-key constraint, we'd better lock the referenced + * table and check that that's not in use, just as we've already done + * for the constrained table (else we might, eg, be dropping a trigger + * that has unfired events). But we can/must skip that in the + * self-referential case. + */ + if (contype == CONSTRAINT_FOREIGN && + con->confrelid != RelationGetRelid(rel)) + { + Relation frel; - for (int i = 0; i < numkeys; i++) - unconstrained_cols = lappend_int(unconstrained_cols, attnums[i]); - } + /* Must match lock taken by RemoveTriggerById: */ + frel = table_open(con->confrelid, AccessExclusiveLock); + CheckTableNotInUse(frel, "ALTER TABLE"); + table_close(frel, NoLock); + } - is_no_inherit_constraint = con->connoinherit; + /* + * Perform the actual constraint deletion + */ + conobj.classId = ConstraintRelationId; + conobj.objectId = con->oid; + conobj.objectSubId = 0; - /* - * If it's a foreign-key constraint, we'd better lock the referenced table - * and check that that's not in use, just as we've already done for the - * constrained table (else we might, eg, be dropping a trigger that has - * unfired events). But we can/must skip that in the self-referential - * case. - */ - if (con->contype == CONSTRAINT_FOREIGN && - con->confrelid != RelationGetRelid(rel)) - { - Relation frel; + performDeletion(&conobj, behavior, 0); - /* Must match lock taken by RemoveTriggerById: */ - frel = table_open(con->confrelid, AccessExclusiveLock); - CheckTableNotInUse(frel, "ALTER TABLE"); - table_close(frel, NoLock); + found = true; } - /* - * Perform the actual constraint deletion - */ - ObjectAddressSet(conobj, ConstraintRelationId, con->oid); - performDeletion(&conobj, behavior, 0); + systable_endscan(scan); - /* - * If this was a NOT NULL or the primary key, the constrained columns must - * have had pg_attribute.attnotnull set. See if we need to reset it, and - * do so. - */ - if (unconstrained_cols) + if (!found) { - Relation attrel; - Bitmapset *pkcols; - Bitmapset *ircols; - ListCell *lc; - - /* Make the above deletion visible */ - CommandCounterIncrement(); - - attrel = table_open(AttributeRelationId, RowExclusiveLock); - - /* - * We want to test columns for their presence in the primary key, but - * only if we're not dropping it. - */ - pkcols = dropping_pk ? NULL : - RelationGetIndexAttrBitmap(rel, - INDEX_ATTR_BITMAP_PRIMARY_KEY); - ircols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY); - - foreach(lc, unconstrained_cols) + if (!missing_ok) { - AttrNumber attnum = lfirst_int(lc); - HeapTuple atttup; - HeapTuple contup; - Form_pg_attribute attForm; - - /* - * Obtain pg_attribute tuple and verify conditions on it. We use - * a copy we can scribble on. - */ - atttup = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum); - if (!HeapTupleIsValid(atttup)) - elog(ERROR, "cache lookup failed for column %d", attnum); - attForm = (Form_pg_attribute) GETSTRUCT(atttup); - - /* - * Since the above deletion has been made visible, we can now - * search for any remaining constraints on this column (or these - * columns, in the case we're dropping a multicol primary key.) - * Then, verify whether any further NOT NULL or primary key - * exists, and reset attnotnull if none. - * - * However, if this is a generated identity column, abort the - * whole thing with a specific error message, because the - * constraint is required in that case. - */ - contup = findNotNullConstraintAttnum(rel, attnum); - if (contup || - bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, - pkcols)) - continue; - - /* - * It's not valid to drop the last NOT NULL constraint for a - * GENERATED AS IDENTITY column. - */ - if (attForm->attidentity) - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("column \"%s\" of relation \"%s\" is an identity column", - get_attname(RelationGetRelid(rel), attnum, - false), - RelationGetRelationName(rel))); - - /* - * It's not valid to drop the last NOT NULL constraint for the - * replica identity either. XXX make exception for FULL? - */ - if (bms_is_member(lfirst_int(lc) - FirstLowInvalidHeapAttributeNumber, ircols)) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column \"%s\" is in index used as replica identity", - get_attname(RelationGetRelid(rel), lfirst_int(lc), false))); - - /* Reset attnotnull */ - if (attForm->attnotnull) - { - attForm->attnotnull = false; - CatalogTupleUpdate(attrel, &atttup->t_self, atttup); - } + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, RelationGetRelationName(rel)))); + } + else + { + ereport(NOTICE, + (errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping", + constrName, RelationGetRelationName(rel)))); + table_close(conrel, RowExclusiveLock); + return; } - table_close(attrel, RowExclusiveLock); } /* * For partitioned tables, non-CHECK inherited constraints are dropped via * the dependency mechanism, so we're done here. */ - if (con->contype != CONSTRAINT_CHECK && + if (contype != CONSTRAINT_CHECK && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { table_close(conrel, RowExclusiveLock); - return conobj; + return; } /* @@ -12613,104 +12094,50 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha errmsg("cannot remove constraint from only the partitioned table when partitions exist"), errhint("Do not specify the ONLY keyword."))); - /* For NOT NULL constraints we recurse by column name */ - if (con->contype == CONSTRAINT_NOTNULL) - colname = NameStr(TupleDescAttr(RelationGetDescr(rel), - linitial_int(unconstrained_cols) - 1)->attname); - else - colname = NULL; /* keep compiler quiet */ - foreach(child, children) { Oid childrelid = lfirst_oid(child); Relation childrel; - HeapTuple tuple; - Form_pg_constraint childcon; HeapTuple copy_tuple; - SysScanDesc scan; - ScanKeyData skey[3]; - - if (list_member_oid(*readyRels, childrelid)) - continue; /* child already processed */ /* find_inheritance_children already got lock */ childrel = table_open(childrelid, NoLock); CheckTableNotInUse(childrel, "ALTER TABLE"); - /* - * We search for NOT NULL constraint by column number, and other - * constraints by name. - */ - if (con->contype == CONSTRAINT_NOTNULL) - { - bool found = false; - AttrNumber child_colnum; - HeapTuple child_tup; - - child_colnum = get_attnum(RelationGetRelid(childrel), colname); - ScanKeyInit(&skey[0], - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(childrelid)); - scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, - true, NULL, 1, skey); - while (HeapTupleIsValid(child_tup = systable_getnext(scan))) - { - Form_pg_constraint constr = (Form_pg_constraint) GETSTRUCT(child_tup); - AttrNumber constr_colnum; - - if (constr->contype != CONSTRAINT_NOTNULL) - continue; - constr_colnum = extractNotNullColumn(child_tup); - if (constr_colnum != child_colnum) - continue; + ScanKeyInit(&skey[0], + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(childrelid)); + ScanKeyInit(&skey[1], + Anum_pg_constraint_contypid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(InvalidOid)); + ScanKeyInit(&skey[2], + Anum_pg_constraint_conname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(constrName)); + scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, + true, NULL, 3, skey); + + /* There can be at most one matching row */ + if (!HeapTupleIsValid(tuple = systable_getnext(scan))) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, + RelationGetRelationName(childrel)))); - found = true; - break; /* found it */ - } - if (!found) /* shouldn't happen? */ - elog(ERROR, "failed to find NOT NULL constraint for column \"%s\" in table \"%s\"", - colname, RelationGetRelationName(childrel)); + copy_tuple = heap_copytuple(tuple); - copy_tuple = heap_copytuple(child_tup); - systable_endscan(scan); - } - else - { - ScanKeyInit(&skey[0], - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(childrelid)); - ScanKeyInit(&skey[1], - Anum_pg_constraint_contypid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(InvalidOid)); - ScanKeyInit(&skey[2], - Anum_pg_constraint_conname, - BTEqualStrategyNumber, F_NAMEEQ, - CStringGetDatum(constrName)); - scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, - true, NULL, 3, skey); - /* There can only be one, so no need to loop */ - tuple = systable_getnext(scan); - if (!HeapTupleIsValid(tuple)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" of relation \"%s\" does not exist", - constrName, - RelationGetRelationName(childrel)))); - copy_tuple = heap_copytuple(tuple); - systable_endscan(scan); - } + systable_endscan(scan); - childcon = (Form_pg_constraint) GETSTRUCT(copy_tuple); + con = (Form_pg_constraint) GETSTRUCT(copy_tuple); - /* Right now only CHECK and NOT NULL constraints can be inherited */ - if (childcon->contype != CONSTRAINT_CHECK && - childcon->contype != CONSTRAINT_NOTNULL) - elog(ERROR, "inherited constraint is not a CHECK or NOT NULL constraint"); + /* Right now only CHECK constraints can be inherited */ + if (con->contype != CONSTRAINT_CHECK) + elog(ERROR, "inherited constraint is not a CHECK constraint"); - if (childcon->coninhcount <= 0) /* shouldn't happen */ + if (con->coninhcount <= 0) /* shouldn't happen */ elog(ERROR, "relation %u has non-inherited constraint \"%s\"", childrelid, constrName); @@ -12720,17 +12147,17 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha * If the child constraint has other definition sources, just * decrement its inheritance count; if not, recurse to delete it. */ - if (childcon->coninhcount == 1 && !childcon->conislocal) + if (con->coninhcount == 1 && !con->conislocal) { /* Time to delete this child constraint, too */ - dropconstraint_internal(childrel, copy_tuple, behavior, - recurse, true, missing_ok, readyRels, - lockmode); + ATExecDropConstraint(childrel, constrName, behavior, + true, true, + false, lockmode); } else { /* Child constraint must survive my deletion */ - childcon->coninhcount--; + con->coninhcount--; CatalogTupleUpdate(conrel, ©_tuple->t_self, copy_tuple); /* Make update visible */ @@ -12744,8 +12171,8 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha * need to mark the inheritors' constraints as locally defined * rather than inherited. */ - childcon->coninhcount--; - childcon->conislocal = true; + con->coninhcount--; + con->conislocal = true; CatalogTupleUpdate(conrel, ©_tuple->t_self, copy_tuple); @@ -12759,8 +12186,6 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha } table_close(conrel, RowExclusiveLock); - - return conobj; } /* @@ -14086,10 +13511,10 @@ ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd, NIL, con->conname); } - else if (cmd->subtype == AT_SetAttNotNull) + else if (cmd->subtype == AT_SetNotNull) { /* - * The parser will create AT_AttSetNotNull subcommands for + * The parser will create AT_SetNotNull subcommands for * columns of PRIMARY KEY indexes/constraints, but we need * not do anything with them here, because the columns' * NOT NULL marks will already have been propagated into @@ -15833,7 +15258,6 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) SysScanDesc parent_scan; ScanKeyData parent_key; HeapTuple parent_tuple; - Oid parent_relid = RelationGetRelid(parent_rel); bool child_is_partition = false; catalog_relation = table_open(ConstraintRelationId, RowExclusiveLock); @@ -15847,7 +15271,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ScanKeyInit(&parent_key, Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(parent_relid)); + ObjectIdGetDatum(RelationGetRelid(parent_rel))); parent_scan = systable_beginscan(catalog_relation, ConstraintRelidTypidNameIndexId, true, NULL, 1, &parent_key); @@ -15859,8 +15283,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) HeapTuple child_tuple; bool found = false; - if (parent_con->contype != CONSTRAINT_CHECK && - parent_con->contype != CONSTRAINT_NOTNULL) + if (parent_con->contype != CONSTRAINT_CHECK) continue; /* if the parent's constraint is marked NO INHERIT, it's not inherited */ @@ -15880,30 +15303,14 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple); HeapTuple child_copy; - if (child_con->contype != parent_con->contype) + if (child_con->contype != CONSTRAINT_CHECK) continue; - /* - * CHECK constraint are matched by name, NOT NULL ones by - * attribute number - */ - if (child_con->contype == CONSTRAINT_CHECK && - strcmp(NameStr(parent_con->conname), + if (strcmp(NameStr(parent_con->conname), NameStr(child_con->conname)) != 0) continue; - else if (child_con->contype == CONSTRAINT_NOTNULL) - { - AttrNumber parent_attno = extractNotNullColumn(parent_tuple); - AttrNumber child_attno = extractNotNullColumn(child_tuple); - - if (strcmp(get_attname(parent_relid, parent_attno, false), - get_attname(RelationGetRelid(child_rel), child_attno, - false)) != 0) - continue; - } - if (child_con->contype == CONSTRAINT_CHECK && - !constraints_equivalent(parent_tuple, child_tuple, tuple_desc)) + if (!constraints_equivalent(parent_tuple, child_tuple, tuple_desc)) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("child table \"%s\" has different definition for check constraint \"%s\"", @@ -16111,7 +15518,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) HeapTuple attributeTuple, constraintTuple; List *connames; - List *nncolumns; bool found; bool child_is_partition = false; @@ -16182,8 +15588,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) * this, we first need a list of the names of the parent's check * constraints. (We cheat a bit by only checking for name matches, * assuming that the expressions will match.) - * - * For NOT NULL columns, we store column numbers to match. */ catalogRelation = table_open(ConstraintRelationId, RowExclusiveLock); ScanKeyInit(&key[0], @@ -16194,7 +15598,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) true, NULL, 1, key); connames = NIL; - nncolumns = NIL; while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) { @@ -16202,8 +15605,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) if (con->contype == CONSTRAINT_CHECK) connames = lappend(connames, pstrdup(NameStr(con->conname))); - if (con->contype == CONSTRAINT_NOTNULL) - nncolumns = lappend_int(nncolumns, extractNotNullColumn(constraintTuple)); } systable_endscan(scan); @@ -16219,40 +15620,21 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) { Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); - bool match = false; + bool match; ListCell *lc; - /* - * Match CHECK constraints by name, NOT NULL constraints by column - * number, and ignore all others. - */ - if (con->contype == CONSTRAINT_CHECK) - { - foreach(lc, connames) - { - if (con->contype == CONSTRAINT_CHECK && - strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0) - { - match = true; - break; - } - } - } - else if (con->contype == CONSTRAINT_NOTNULL) - { - AttrNumber child_attno = extractNotNullColumn(constraintTuple); + if (con->contype != CONSTRAINT_CHECK) + continue; - foreach(lc, nncolumns) + match = false; + foreach(lc, connames) + { + if (strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0) { - if (lfirst_int(lc) == child_attno) - { - match = true; - break; - } + match = true; + break; } } - else - continue; if (match) { @@ -18543,7 +17925,7 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, StorePartitionBound(attachrel, rel, cmd->bound); /* Ensure there exists a correct set of indexes in the partition. */ - AttachPartitionEnsureIndexes(wqueue, rel, attachrel); + AttachPartitionEnsureIndexes(rel, attachrel); /* and triggers */ CloneRowTriggersToPartition(rel, attachrel); @@ -18656,12 +18038,13 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, * partitioned table. */ static void -AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) +AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) { List *idxes; List *attachRelIdxs; Relation *attachrelIdxRels; IndexInfo **attachInfos; + int i; ListCell *cell; MemoryContext cxt; MemoryContext oldcxt; @@ -18677,13 +18060,14 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) attachInfos = palloc(sizeof(IndexInfo *) * list_length(attachRelIdxs)); /* Build arrays of all existing indexes and their IndexInfos */ + i = 0; foreach(cell, attachRelIdxs) { Oid cldIdxId = lfirst_oid(cell); - int i = foreach_current_index(cell); attachrelIdxRels[i] = index_open(cldIdxId, AccessShareLock); attachInfos[i] = BuildIndexInfo(attachrelIdxRels[i]); + i++; } /* @@ -18749,7 +18133,7 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) * the first matching, unattached one we find, if any, as partition of * the parent index. If we find one, we're done. */ - for (int i = 0; i < list_length(attachRelIdxs); i++) + for (i = 0; i < list_length(attachRelIdxs); i++) { Oid cldIdxId = RelationGetRelid(attachrelIdxRels[i]); Oid cldConstrOid = InvalidOid; @@ -18805,28 +18189,6 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) stmt = generateClonedIndexStmt(NULL, idxRel, attmap, &conOid); - - /* - * If the index is a primary key, mark all columns as NOT NULL if - * they aren't already. - */ - if (stmt->primary) - { - MemoryContextSwitchTo(oldcxt); - for (int j = 0; j < info->ii_NumIndexKeyAttrs; j++) - { - AttrNumber childattno; - - childattno = get_attnum(RelationGetRelid(attachrel), - get_attname(RelationGetRelid(rel), - info->ii_IndexAttrNumbers[j], - false)); - set_attnotnull(wqueue, attachrel, childattno, - true, AccessExclusiveLock); - } - MemoryContextSwitchTo(cxt); - } - DefineIndex(RelationGetRelid(attachrel), stmt, InvalidOid, RelationGetRelid(idxRel), conOid, @@ -18839,7 +18201,7 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) out: /* Clean up. */ - for (int i = 0; i < list_length(attachRelIdxs); i++) + for (i = 0; i < list_length(attachRelIdxs); i++) index_close(attachrelIdxRels[i], AccessShareLock); MemoryContextSwitchTo(oldcxt); MemoryContextDelete(cxt); @@ -19740,13 +19102,6 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name) RelationGetRelationName(partIdx)))); } - /* - * If it's a primary key, make sure the columns in the partition are - * NOT NULL. - */ - if (parentIdx->rd_index->indisprimary) - verifyPartitionIndexNotNull(childInfo, partTbl); - /* All good -- do it */ IndexSetParentIndex(partIdx, RelationGetRelid(parentIdx)); if (OidIsValid(constraintOid)) @@ -19884,29 +19239,6 @@ validatePartitionedIndex(Relation partedIdx, Relation partedTbl) } /* - * When attaching an index as a partition of a partitioned index which is a - * primary key, verify that all the columns in the partition are marked NOT - * NULL. - */ -static void -verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partition) -{ - for (int i = 0; i < iinfo->ii_NumIndexKeyAttrs; i++) - { - Form_pg_attribute att = TupleDescAttr(RelationGetDescr(partition), - iinfo->ii_IndexAttrNumbers[i] - 1); - - if (!att->attnotnull) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("invalid primary key definition"), - errdetail("Column \"%s\" of relation \"%s\" is not marked NOT NULL.", - NameStr(att->attname), - RelationGetRelationName(partition))); - } -} - -/* * Return an OID list of constraints that reference the given relation * that are marked as having a parent constraints. */ diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index d6d67c9083..ba00b99249 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -717,10 +717,6 @@ _outConstraint(StringInfo str, const Constraint *node) case CONSTR_NOTNULL: appendStringInfoString(str, "NOT_NULL"); - WRITE_BOOL_FIELD(is_no_inherit); - WRITE_STRING_FIELD(colname); - WRITE_BOOL_FIELD(skip_validation); - WRITE_BOOL_FIELD(initially_valid); break; case CONSTR_DEFAULT: diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 4c200485f3..597e5b3ea8 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -390,14 +390,8 @@ _readConstraint(void) switch (local_node->contype) { case CONSTR_NULL: - /* no extra fields */ - break; - case CONSTR_NOTNULL: - READ_BOOL_FIELD(is_no_inherit); - READ_STRING_FIELD(colname); - READ_BOOL_FIELD(skip_validation); - READ_BOOL_FIELD(initially_valid); + /* no extra fields */ break; case CONSTR_DEFAULT: diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c index 75d8445914..e3824efe9b 100644 --- a/src/backend/optimizer/util/plancat.c +++ b/src/backend/optimizer/util/plancat.c @@ -1644,8 +1644,6 @@ relation_excluded_by_constraints(PlannerInfo *root, * Currently, attnotnull constraints must be treated as NO INHERIT unless * this is a partitioned table. In future we might track their * inheritance status more accurately, allowing this to be refined. - * - * XXX do we need/want to change this? */ include_notnull = (!rte->inh || rte->relkind == RELKIND_PARTITIONED_TABLE); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 86692bf395..acf6cf4866 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -4099,19 +4099,6 @@ ConstraintElem: n->initially_valid = !n->skip_validation; $$ = (Node *) n; } - | NOT NULL_P ColId ConstraintAttributeSpec - { - Constraint *n = makeNode(Constraint); - - n->contype = CONSTR_NOTNULL; - n->location = @1; - n->colname = $3; - processCASbits($4, @4, "NOT NULL", - NULL, NULL, NULL, - &n->is_no_inherit, yyscanner); - n->initially_valid = !n->skip_validation; - $$ = (Node *) n; - } | UNIQUE opt_unique_null_treatment '(' columnList ')' opt_c_include opt_definition OptConsTableSpace ConstraintAttributeSpec { diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index ea0fe6f811..b0f6fe4fa6 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -83,7 +83,6 @@ typedef struct List *ckconstraints; /* CHECK constraints */ List *fkconstraints; /* FOREIGN KEY constraints */ List *ixconstraints; /* index-creating constraints */ - List *nnconstraints; /* NOT NULL constraints */ List *likeclauses; /* LIKE clauses that need post-processing */ List *extstats; /* cloned extended statistics */ List *blist; /* "before list" of things to do before @@ -245,7 +244,6 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) cxt.ckconstraints = NIL; cxt.fkconstraints = NIL; cxt.ixconstraints = NIL; - cxt.nnconstraints = NIL; cxt.likeclauses = NIL; cxt.extstats = NIL; cxt.blist = NIL; @@ -350,7 +348,6 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) */ stmt->tableElts = cxt.columns; stmt->constraints = cxt.ckconstraints; - stmt->nnconstraints = cxt.nnconstraints; result = lappend(cxt.blist, stmt); result = list_concat(result, cxt.alist); @@ -540,7 +537,6 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column) bool saw_default; bool saw_identity; bool saw_generated; - bool need_notnull = false; ListCell *clist; cxt->columns = lappend(cxt->columns, column); @@ -638,8 +634,10 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column) constraint->cooked_expr = NULL; column->constraints = lappend(column->constraints, constraint); - /* have a NOT NULL constraint added later */ - need_notnull = true; + constraint = makeNode(Constraint); + constraint->contype = CONSTR_NOTNULL; + constraint->location = -1; + column->constraints = lappend(column->constraints, constraint); } /* Process column constraints, if any... */ @@ -657,7 +655,7 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column) switch (constraint->contype) { case CONSTR_NULL: - if ((saw_nullable && column->is_not_null) || need_notnull) + if (saw_nullable && column->is_not_null) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting NULL/NOT NULL declarations for column \"%s\" of table \"%s\"", @@ -669,58 +667,15 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column) break; case CONSTR_NOTNULL: - - /* - * For NOT NULL declarations, we need to mark the column as - * not nullable, and set things up to have a CHECK constraint - * created. Also, duplicate NOT NULL declarations are not - * allowed. - */ - if (saw_nullable) - { - if (!column->is_not_null) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting NULL/NOT NULL declarations for column \"%s\" of table \"%s\"", - column->colname, cxt->relation->relname), - parser_errposition(cxt->pstate, - constraint->location))); - else - ereport(ERROR, - errcode(ERRCODE_SYNTAX_ERROR), - errmsg("redundant NOT NULL declarations for column \"%s\" of table \"%s\"", - column->colname, cxt->relation->relname), - parser_errposition(cxt->pstate, - constraint->location)); - } - - /* - * If this is the first time we see this column being marked - * not null, keep track to later add a NOT NULL constraint. - */ - if (!column->is_not_null) - { - Constraint *notnull; - - column->is_not_null = true; - saw_nullable = true; - - notnull = makeNode(Constraint); - notnull->contype = CONSTR_NOTNULL; - notnull->conname = constraint->conname; - notnull->deferrable = false; - notnull->initdeferred = false; - notnull->location = -1; - notnull->colname = column->colname; - notnull->skip_validation = false; - notnull->initially_valid = true; - - cxt->nnconstraints = lappend(cxt->nnconstraints, notnull); - - /* Don't need this anymore, if we had it */ - need_notnull = false; - } - + if (saw_nullable && !column->is_not_null) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting NULL/NOT NULL declarations for column \"%s\" of table \"%s\"", + column->colname, cxt->relation->relname), + parser_errposition(cxt->pstate, + constraint->location))); + column->is_not_null = true; + saw_nullable = true; break; case CONSTR_DEFAULT: @@ -770,19 +725,16 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column) column->identity = constraint->generated_when; saw_identity = true; - /* - * Identity columns are always NOT NULL, but we may have a - * constraint already. - */ - if (!saw_nullable) - need_notnull = true; - else if (!column->is_not_null) + /* An identity column is implicitly NOT NULL */ + if (saw_nullable && !column->is_not_null) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting NULL/NOT NULL declarations for column \"%s\" of table \"%s\"", column->colname, cxt->relation->relname), parser_errposition(cxt->pstate, constraint->location))); + column->is_not_null = true; + saw_nullable = true; break; } @@ -806,11 +758,6 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column) case CONSTR_CHECK: cxt->ckconstraints = lappend(cxt->ckconstraints, constraint); - - /* - * XXX If the user says CHECK (IS NOT NULL), should we turn - * that into a regular NOT NULL constraint? - */ break; case CONSTR_PRIMARY: @@ -894,29 +841,6 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column) } /* - * If we need a NOT NULL constraint for SERIAL or IDENTITY, and one was - * not explicitly specified, add one now. - */ - if (need_notnull && !(saw_nullable && column->is_not_null)) - { - Constraint *notnull; - - column->is_not_null = true; - - notnull = makeNode(Constraint); - notnull->contype = CONSTR_NOTNULL; - notnull->conname = NULL; - notnull->deferrable = false; - notnull->initdeferred = false; - notnull->location = -1; - notnull->colname = column->colname; - notnull->skip_validation = false; - notnull->initially_valid = true; - - cxt->nnconstraints = lappend(cxt->nnconstraints, notnull); - } - - /* * If needed, generate ALTER FOREIGN TABLE ALTER COLUMN statement to add * per-column foreign data wrapper options to this column after creation. */ @@ -991,10 +915,6 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint) cxt->ckconstraints = lappend(cxt->ckconstraints, constraint); break; - case CONSTR_NOTNULL: - cxt->nnconstraints = lappend(cxt->nnconstraints, constraint); - break; - case CONSTR_FOREIGN: if (cxt->isforeign) ereport(ERROR, @@ -1006,6 +926,7 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint) break; case CONSTR_NULL: + case CONSTR_NOTNULL: case CONSTR_DEFAULT: case CONSTR_ATTR_DEFERRABLE: case CONSTR_ATTR_NOT_DEFERRABLE: @@ -1041,7 +962,6 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla AclResult aclresult; char *comment; ParseCallbackState pcbstate; - bool process_notnull_constraints = false; setup_parser_errposition_callback(&pcbstate, cxt->pstate, table_like_clause->relation->location); @@ -1123,8 +1043,6 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla def->inhcount = 0; def->is_local = true; def->is_not_null = attribute->attnotnull; - if (attribute->attnotnull) - process_notnull_constraints = true; def->is_from_type = false; def->storage = 0; def->raw_default = NULL; @@ -1206,19 +1124,14 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla * we don't yet know what column numbers the copied columns will have in * the finished table. If any of those options are specified, add the * LIKE clause to cxt->likeclauses so that expandTableLikeClause will be - * called after we do know that; in addition, do that if there are any NOT - * NULL constraints, because those must be propagated even if not - * explicitly requested. - * - * In order for this to work, we remember the relation OID so that + * called after we do know that. Also, remember the relation OID so that * expandTableLikeClause is certain to open the same table. */ - if ((table_like_clause->options & + if (table_like_clause->options & (CREATE_TABLE_LIKE_DEFAULTS | CREATE_TABLE_LIKE_GENERATED | CREATE_TABLE_LIKE_CONSTRAINTS | - CREATE_TABLE_LIKE_INDEXES)) || - process_notnull_constraints) + CREATE_TABLE_LIKE_INDEXES)) { table_like_clause->relationOid = RelationGetRelid(relation); cxt->likeclauses = lappend(cxt->likeclauses, table_like_clause); @@ -1290,7 +1203,6 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause) TupleConstr *constr; AttrMap *attmap; char *comment; - ListCell *lc; /* * Open the relation referenced by the LIKE clause. We should still have @@ -1471,20 +1383,6 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause) } /* - * Copy NOT NULL constraints, too (these do not require any option to have - * been given). - */ - foreach(lc, RelationGetNotNullConstraints(relation, false)) - { - AlterTableCmd *atsubcmd; - - atsubcmd = makeNode(AlterTableCmd); - atsubcmd->subtype = AT_AddConstraint; - atsubcmd->def = (Node *) lfirst_node(Constraint, lc); - atsubcmds = lappend(atsubcmds, atsubcmd); - } - - /* * If we generated any ALTER TABLE actions above, wrap them into a single * ALTER TABLE command. Stick it at the front of the result, so it runs * before any CommentStmts we made above. @@ -2161,12 +2059,10 @@ transformIndexConstraints(CreateStmtContext *cxt) ListCell *lc; /* - * Run through the constraints that need to generate an index, and do so. - * - * For PRIMARY KEY, in addition we set each column's attnotnull flag true. - * We do not create a separate CHECK (IS NOT NULL) constraint, as that - * would be redundant: the PRIMARY KEY constraint itself fulfills that - * role. Other constraint types don't need any NOT NULL markings. + * Run through the constraints that need to generate an index. For PRIMARY + * KEY, mark each column as NOT NULL and create an index. For UNIQUE or + * EXCLUDE, create an index as for PRIMARY KEY, but do not insist on NOT + * NULL. */ foreach(lc, cxt->ixconstraints) { @@ -2240,7 +2136,9 @@ transformIndexConstraints(CreateStmtContext *cxt) } /* - * Now append all the IndexStmts to cxt->alist. + * Now append all the IndexStmts to cxt->alist. If we generated an ALTER + * TABLE SET NOT NULL statement to support a primary key, it's already in + * cxt->alist. */ cxt->alist = list_concat(cxt->alist, finalindexlist); } @@ -2248,10 +2146,12 @@ transformIndexConstraints(CreateStmtContext *cxt) /* * transformIndexConstraint * Transform one UNIQUE, PRIMARY KEY, or EXCLUDE constraint for - * transformIndexConstraints. An IndexStmt is returned. + * transformIndexConstraints. * - * For a PRIMARY KEY constraint, we additionally force the columns to be - * marked as NOT NULL, without producing a CHECK (IS NOT NULL) constraint. + * We return an IndexStmt. For a PRIMARY KEY constraint, we additionally + * produce NOT NULL constraints, either by marking ColumnDefs in cxt->columns + * as is_not_null or by adding an ALTER TABLE SET NOT NULL command to + * cxt->alist. */ static IndexStmt * transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) @@ -2517,6 +2417,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) { char *key = strVal(lfirst(lc)); bool found = false; + bool forced_not_null = false; ColumnDef *column = NULL; ListCell *columns; IndexElem *iparam; @@ -2537,14 +2438,13 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) * column is defined in the new table. For PRIMARY KEY, we * can apply the NOT NULL constraint cheaply here ... unless * the column is marked is_from_type, in which case marking it - * here would be ineffective (see MergeAttributes). Note that - * this isn't effective in ALTER TABLE either, unless the - * column is being added in the same command. + * here would be ineffective (see MergeAttributes). */ if (constraint->contype == CONSTR_PRIMARY && !column->is_from_type) { column->is_not_null = true; + forced_not_null = true; } } else if (SystemAttributeByName(key) != NULL) @@ -2587,6 +2487,14 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) if (strcmp(key, inhname) == 0) { found = true; + + /* + * It's tempting to set forced_not_null if the + * parent column is already NOT NULL, but that + * seems unsafe because the column's NOT NULL + * marking might disappear between now and + * execution. Do the runtime check to be safe. + */ break; } } @@ -2640,11 +2548,15 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; index->indexParams = lappend(index->indexParams, iparam); - if (constraint->contype == CONSTR_PRIMARY) + /* + * For a primary-key column, also create an item for ALTER TABLE + * SET NOT NULL if we couldn't ensure it via is_not_null above. + */ + if (constraint->contype == CONSTR_PRIMARY && !forced_not_null) { AlterTableCmd *notnullcmd = makeNode(AlterTableCmd); - notnullcmd->subtype = AT_SetAttNotNull; + notnullcmd->subtype = AT_SetNotNull; notnullcmd->name = pstrdup(key); notnullcmds = lappend(notnullcmds, notnullcmd); } @@ -3416,7 +3328,6 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, cxt.isalter = true; cxt.columns = NIL; cxt.ckconstraints = NIL; - cxt.nnconstraints = NIL; cxt.fkconstraints = NIL; cxt.ixconstraints = NIL; cxt.likeclauses = NIL; @@ -3660,8 +3571,8 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, /* * We assume here that cxt.alist contains only IndexStmts and possibly - * AT_SetAttNotNull statements generated from primary key constraints. - * We absorb the subcommands of the latter directly. + * ALTER TABLE SET NOT NULL statements generated from primary key + * constraints. We absorb the subcommands of the latter directly. */ if (IsA(istmt, IndexStmt)) { @@ -3689,21 +3600,14 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, { newcmd = makeNode(AlterTableCmd); newcmd->subtype = AT_AddConstraint; - newcmd->def = (Node *) lfirst_node(Constraint, l); + newcmd->def = (Node *) lfirst(l); newcmds = lappend(newcmds, newcmd); } foreach(l, cxt.fkconstraints) { newcmd = makeNode(AlterTableCmd); newcmd->subtype = AT_AddConstraint; - newcmd->def = (Node *) lfirst_node(Constraint, l); - newcmds = lappend(newcmds, newcmd); - } - foreach(l, cxt.nnconstraints) - { - newcmd = makeNode(AlterTableCmd); - newcmd->subtype = AT_AddConstraint; - newcmd->def = (Node *) lfirst_node(Constraint, l); + newcmd->def = (Node *) lfirst(l); newcmds = lappend(newcmds, newcmd); } diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 0242cf24b4..461735e84f 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -2472,20 +2472,6 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, conForm->connoinherit ? " NO INHERIT" : ""); break; } - case CONSTRAINT_NOTNULL: - { - AttrNumber attnum; - - attnum = extractNotNullColumn(tup); - - appendStringInfo(&buf, "NOT NULL %s", - quote_identifier(get_attname(conForm->conrelid, - attnum, false))); - if (((Form_pg_constraint) GETSTRUCT(tup))->connoinherit) - appendStringInfoString(&buf, " NO INHERIT"); - break; - } - case CONSTRAINT_TRIGGER: /* |