Browse Source

Fixed #30913 -- Added support for covering indexes on PostgreSQL 11+.

Hannes Ljungberg 5 years ago
parent
commit
8c7992f658

+ 1 - 0
django/contrib/gis/db/backends/postgis/schema.py

@@ -45,6 +45,7 @@ class PostGISSchemaEditor(DatabaseSchemaEditor):
             columns=field_column,
             extra='',
             condition='',
+            include='',
         )
 
     def _alter_column_type_sql(self, table, old_field, new_field, new_type):

+ 4 - 0
django/contrib/postgres/indexes.py

@@ -180,6 +180,10 @@ class GistIndex(PostgresIndex):
             with_params.append('fillfactor = %d' % self.fillfactor)
         return with_params
 
+    def check_supported(self, schema_editor):
+        if self.include and not schema_editor.connection.features.supports_covering_gist_indexes:
+            raise NotSupportedError('Covering GiST indexes requires PostgreSQL 12+.')
+
 
 class HashIndex(PostgresIndex):
     suffix = 'hash'

+ 2 - 0
django/db/backends/base/features.py

@@ -277,6 +277,8 @@ class BaseDatabaseFeatures:
     # Does the backend support partial indexes (CREATE INDEX ... WHERE ...)?
     supports_partial_indexes = True
     supports_functions_in_partial_indexes = True
+    # Does the backend support covering indexes (CREATE INDEX ... INCLUDE ...)?
+    supports_covering_indexes = False
 
     # Does the database allow more than one constraint or index on the same
     # field(s)?

+ 44 - 23
django/db/backends/base/schema.py

@@ -84,8 +84,8 @@ class BaseDatabaseSchemaEditor:
     sql_create_column_inline_fk = None
     sql_delete_fk = sql_delete_constraint
 
-    sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s%(condition)s"
-    sql_create_unique_index = "CREATE UNIQUE INDEX %(name)s ON %(table)s (%(columns)s)%(condition)s"
+    sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(include)s%(extra)s%(condition)s"
+    sql_create_unique_index = "CREATE UNIQUE INDEX %(name)s ON %(table)s (%(columns)s)%(include)s%(condition)s"
     sql_delete_index = "DROP INDEX %(name)s"
 
     sql_create_pk = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
@@ -956,9 +956,17 @@ class BaseDatabaseSchemaEditor:
             return ' WHERE ' + condition
         return ''
 
+    def _index_include_sql(self, model, columns):
+        if not columns or not self.connection.features.supports_covering_indexes:
+            return ''
+        return Statement(
+            ' INCLUDE (%(columns)s)',
+            columns=Columns(model._meta.db_table, columns, self.quote_name),
+        )
+
     def _create_index_sql(self, model, fields, *, name=None, suffix='', using='',
                           db_tablespace=None, col_suffixes=(), sql=None, opclasses=(),
-                          condition=None):
+                          condition=None, include=None):
         """
         Return the SQL statement to create the index for one or several fields.
         `sql` can be specified if the syntax differs from the standard (GIS
@@ -983,6 +991,7 @@ class BaseDatabaseSchemaEditor:
             columns=self._index_columns(table, columns, col_suffixes, opclasses),
             extra=tablespace_sql,
             condition=self._index_condition_sql(condition),
+            include=self._index_include_sql(model, include),
         )
 
     def _delete_index_sql(self, model, name, sql=None):
@@ -1083,16 +1092,22 @@ class BaseDatabaseSchemaEditor:
         if deferrable == Deferrable.IMMEDIATE:
             return ' DEFERRABLE INITIALLY IMMEDIATE'
 
-    def _unique_sql(self, model, fields, name, condition=None, deferrable=None):
+    def _unique_sql(self, model, fields, name, condition=None, deferrable=None, include=None):
         if (
             deferrable and
             not self.connection.features.supports_deferrable_unique_constraints
         ):
             return None
-        if condition:
-            # Databases support conditional unique constraints via a unique
-            # index.
-            sql = self._create_unique_sql(model, fields, name=name, condition=condition)
+        if condition or include:
+            # Databases support conditional and covering unique constraints via
+            # a unique index.
+            sql = self._create_unique_sql(
+                model,
+                fields,
+                name=name,
+                condition=condition,
+                include=include,
+            )
             if sql:
                 self.deferred_sql.append(sql)
             return None
@@ -1105,10 +1120,14 @@ class BaseDatabaseSchemaEditor:
             'constraint': constraint,
         }
 
-    def _create_unique_sql(self, model, columns, name=None, condition=None, deferrable=None):
+    def _create_unique_sql(self, model, columns, name=None, condition=None, deferrable=None, include=None):
         if (
-            deferrable and
-            not self.connection.features.supports_deferrable_unique_constraints
+            (
+                deferrable and
+                not self.connection.features.supports_deferrable_unique_constraints
+            ) or
+            (condition and not self.connection.features.supports_partial_indexes) or
+            (include and not self.connection.features.supports_covering_indexes)
         ):
             return None
 
@@ -1121,9 +1140,7 @@ class BaseDatabaseSchemaEditor:
         else:
             name = self.quote_name(name)
         columns = Columns(table, columns, self.quote_name)
-        if condition:
-            if not self.connection.features.supports_partial_indexes:
-                return None
+        if condition or include:
             sql = self.sql_create_unique_index
         else:
             sql = self.sql_create_unique
@@ -1134,20 +1151,24 @@ class BaseDatabaseSchemaEditor:
             columns=columns,
             condition=self._index_condition_sql(condition),
             deferrable=self._deferrable_constraint_sql(deferrable),
+            include=self._index_include_sql(model, include),
         )
 
-    def _delete_unique_sql(self, model, name, condition=None, deferrable=None):
+    def _delete_unique_sql(self, model, name, condition=None, deferrable=None, include=None):
         if (
-            deferrable and
-            not self.connection.features.supports_deferrable_unique_constraints
+            (
+                deferrable and
+                not self.connection.features.supports_deferrable_unique_constraints
+            ) or
+            (condition and not self.connection.features.supports_partial_indexes) or
+            (include and not self.connection.features.supports_covering_indexes)
         ):
             return None
-        if condition:
-            return (
-                self._delete_constraint_sql(self.sql_delete_index, model, name)
-                if self.connection.features.supports_partial_indexes else None
-            )
-        return self._delete_constraint_sql(self.sql_delete_unique, model, name)
+        if condition or include:
+            sql = self.sql_delete_index
+        else:
+            sql = self.sql_delete_unique
+        return self._delete_constraint_sql(sql, model, name)
 
     def _check_sql(self, name, check):
         return self.sql_constraint % {

+ 2 - 0
django/db/backends/postgresql/features.py

@@ -82,3 +82,5 @@ class DatabaseFeatures(BaseDatabaseFeatures):
     has_brin_autosummarize = property(operator.attrgetter('is_postgresql_10'))
     has_websearch_to_tsquery = property(operator.attrgetter('is_postgresql_11'))
     supports_table_partitions = property(operator.attrgetter('is_postgresql_10'))
+    supports_covering_indexes = property(operator.attrgetter('is_postgresql_11'))
+    supports_covering_gist_indexes = property(operator.attrgetter('is_postgresql_12'))

+ 8 - 3
django/db/backends/postgresql/schema.py

@@ -12,9 +12,13 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
     sql_set_sequence_max = "SELECT setval('%(sequence)s', MAX(%(column)s)) FROM %(table)s"
     sql_set_sequence_owner = 'ALTER SEQUENCE %(sequence)s OWNED BY %(table)s.%(column)s'
 
-    sql_create_index = "CREATE INDEX %(name)s ON %(table)s%(using)s (%(columns)s)%(extra)s%(condition)s"
+    sql_create_index = (
+        'CREATE INDEX %(name)s ON %(table)s%(using)s '
+        '(%(columns)s)%(include)s%(extra)s%(condition)s'
+    )
     sql_create_index_concurrently = (
-        "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s%(using)s (%(columns)s)%(extra)s%(condition)s"
+        'CREATE INDEX CONCURRENTLY %(name)s ON %(table)s%(using)s '
+        '(%(columns)s)%(include)s%(extra)s%(condition)s'
     )
     sql_delete_index = "DROP INDEX IF EXISTS %(name)s"
     sql_delete_index_concurrently = "DROP INDEX CONCURRENTLY IF EXISTS %(name)s"
@@ -197,10 +201,11 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
     def _create_index_sql(
         self, model, fields, *, name=None, suffix='', using='',
         db_tablespace=None, col_suffixes=(), sql=None, opclasses=(),
-        condition=None, concurrently=False,
+        condition=None, concurrently=False, include=None,
     ):
         sql = self.sql_create_index if not concurrently else self.sql_create_index_concurrently
         return super()._create_index_sql(
             model, fields, name=name, suffix=suffix, using=using, db_tablespace=db_tablespace,
             col_suffixes=col_suffixes, sql=sql, opclasses=opclasses, condition=condition,
+            include=include,
         )

+ 3 - 3
django/db/models/base.py

@@ -1633,6 +1633,7 @@ class Model(metaclass=ModelBase):
                     )
                 )
         fields = [field for index in cls._meta.indexes for field, _ in index.fields_orders]
+        fields += [include for index in cls._meta.indexes for include in index.include]
         errors.extend(cls._check_local_fields(fields, 'indexes'))
         return errors
 
@@ -1926,10 +1927,9 @@ class Model(metaclass=ModelBase):
                         id='models.W038',
                     )
                 )
-            fields = (
-                field
+            fields = chain.from_iterable(
+                (*constraint.fields, *constraint.include)
                 for constraint in cls._meta.constraints if isinstance(constraint, UniqueConstraint)
-                for field in constraint.fields
             )
             errors.extend(cls._check_local_fields(fields, 'constraints'))
         return errors

+ 16 - 5
django/db/models/constraints.py

@@ -77,7 +77,7 @@ class Deferrable(Enum):
 
 
 class UniqueConstraint(BaseConstraint):
-    def __init__(self, *, fields, name, condition=None, deferrable=None):
+    def __init__(self, *, fields, name, condition=None, deferrable=None, include=None):
         if not fields:
             raise ValueError('At least one field is required to define a unique constraint.')
         if not isinstance(condition, (type(None), Q)):
@@ -90,9 +90,12 @@ class UniqueConstraint(BaseConstraint):
             raise ValueError(
                 'UniqueConstraint.deferrable must be a Deferrable instance.'
             )
+        if not isinstance(include, (type(None), list, tuple)):
+            raise ValueError('UniqueConstraint.include must be a list or tuple.')
         self.fields = tuple(fields)
         self.condition = condition
         self.deferrable = deferrable
+        self.include = tuple(include) if include else ()
         super().__init__(name)
 
     def _get_condition_sql(self, model, schema_editor):
@@ -106,31 +109,36 @@ class UniqueConstraint(BaseConstraint):
 
     def constraint_sql(self, model, schema_editor):
         fields = [model._meta.get_field(field_name).column for field_name in self.fields]
+        include = [model._meta.get_field(field_name).column for field_name in self.include]
         condition = self._get_condition_sql(model, schema_editor)
         return schema_editor._unique_sql(
             model, fields, self.name, condition=condition,
-            deferrable=self.deferrable,
+            deferrable=self.deferrable, include=include,
         )
 
     def create_sql(self, model, schema_editor):
         fields = [model._meta.get_field(field_name).column for field_name in self.fields]
+        include = [model._meta.get_field(field_name).column for field_name in self.include]
         condition = self._get_condition_sql(model, schema_editor)
         return schema_editor._create_unique_sql(
             model, fields, self.name, condition=condition,
-            deferrable=self.deferrable,
+            deferrable=self.deferrable, include=include,
         )
 
     def remove_sql(self, model, schema_editor):
         condition = self._get_condition_sql(model, schema_editor)
+        include = [model._meta.get_field(field_name).column for field_name in self.include]
         return schema_editor._delete_unique_sql(
             model, self.name, condition=condition, deferrable=self.deferrable,
+            include=include,
         )
 
     def __repr__(self):
-        return '<%s: fields=%r name=%r%s%s>' % (
+        return '<%s: fields=%r name=%r%s%s%s>' % (
             self.__class__.__name__, self.fields, self.name,
             '' if self.condition is None else ' condition=%s' % self.condition,
             '' if self.deferrable is None else ' deferrable=%s' % self.deferrable,
+            '' if not self.include else ' include=%s' % repr(self.include),
         )
 
     def __eq__(self, other):
@@ -139,7 +147,8 @@ class UniqueConstraint(BaseConstraint):
                 self.name == other.name and
                 self.fields == other.fields and
                 self.condition == other.condition and
-                self.deferrable == other.deferrable
+                self.deferrable == other.deferrable and
+                self.include == other.include
             )
         return super().__eq__(other)
 
@@ -150,4 +159,6 @@ class UniqueConstraint(BaseConstraint):
             kwargs['condition'] = self.condition
         if self.deferrable:
             kwargs['deferrable'] = self.deferrable
+        if self.include:
+            kwargs['include'] = self.include
         return path, args, kwargs

+ 21 - 3
django/db/models/indexes.py

@@ -11,7 +11,16 @@ class Index:
     # cross-database compatibility with Oracle)
     max_name_length = 30
 
-    def __init__(self, *, fields=(), name=None, db_tablespace=None, opclasses=(), condition=None):
+    def __init__(
+        self,
+        *,
+        fields=(),
+        name=None,
+        db_tablespace=None,
+        opclasses=(),
+        condition=None,
+        include=None,
+    ):
         if opclasses and not name:
             raise ValueError('An index must be named to use opclasses.')
         if not isinstance(condition, (type(None), Q)):
@@ -26,6 +35,10 @@ class Index:
             raise ValueError('Index.fields and Index.opclasses must have the same number of elements.')
         if not fields:
             raise ValueError('At least one field is required to define an index.')
+        if include and not name:
+            raise ValueError('A covering index must be named.')
+        if not isinstance(include, (type(None), list, tuple)):
+            raise ValueError('Index.include must be a list or tuple.')
         self.fields = list(fields)
         # A list of 2-tuple with the field name and ordering ('' or 'DESC').
         self.fields_orders = [
@@ -36,6 +49,7 @@ class Index:
         self.db_tablespace = db_tablespace
         self.opclasses = opclasses
         self.condition = condition
+        self.include = tuple(include) if include else ()
 
     def _get_condition_sql(self, model, schema_editor):
         if self.condition is None:
@@ -48,12 +62,13 @@ class Index:
 
     def create_sql(self, model, schema_editor, using='', **kwargs):
         fields = [model._meta.get_field(field_name) for field_name, _ in self.fields_orders]
+        include = [model._meta.get_field(field_name).column for field_name in self.include]
         col_suffixes = [order[1] for order in self.fields_orders]
         condition = self._get_condition_sql(model, schema_editor)
         return schema_editor._create_index_sql(
             model, fields, name=self.name, using=using, db_tablespace=self.db_tablespace,
             col_suffixes=col_suffixes, opclasses=self.opclasses, condition=condition,
-            **kwargs,
+            include=include, **kwargs,
         )
 
     def remove_sql(self, model, schema_editor, **kwargs):
@@ -69,6 +84,8 @@ class Index:
             kwargs['opclasses'] = self.opclasses
         if self.condition:
             kwargs['condition'] = self.condition
+        if self.include:
+            kwargs['include'] = self.include
         return (path, (), kwargs)
 
     def clone(self):
@@ -106,9 +123,10 @@ class Index:
             self.name = 'D%s' % self.name[1:]
 
     def __repr__(self):
-        return "<%s: fields='%s'%s>" % (
+        return "<%s: fields='%s'%s%s>" % (
             self.__class__.__name__, ', '.join(self.fields),
             '' if self.condition is None else ', condition=%s' % self.condition,
+            '' if not self.include else ", include='%s'" % ', '.join(self.include),
         )
 
     def __eq__(self, other):

+ 24 - 1
docs/ref/models/constraints.txt

@@ -73,7 +73,7 @@ constraint.
 ``UniqueConstraint``
 ====================
 
-.. class:: UniqueConstraint(*, fields, name, condition=None, deferrable=None)
+.. class:: UniqueConstraint(*, fields, name, condition=None, deferrable=None, include=None)
 
     Creates a unique constraint in the database.
 
@@ -145,3 +145,26 @@ enforced immediately after every command.
 
     Deferred unique constraints may lead to a `performance penalty
     <https://www.postgresql.org/docs/current/sql-createtable.html#id-1.9.3.85.9.4>`_.
+
+``include``
+-----------
+
+.. attribute:: UniqueConstraint.include
+
+.. versionadded:: 3.2
+
+A list or tuple of the names of the fields to be included in the covering
+unique index as non-key columns. This allows index-only scans to be used for
+queries that select only included fields (:attr:`~UniqueConstraint.include`)
+and filter only by unique fields (:attr:`~UniqueConstraint.fields`).
+
+For example::
+
+    UniqueConstraint(name='unique_booking', fields=['room', 'date'], include=['full_name'])
+
+will allow filtering on ``room`` and ``date``, also selecting ``full_name``,
+while fetching data only from the index.
+
+``include`` is supported only on PostgreSQL.
+
+Non-key columns have the same database restrictions as :attr:`Index.include`.

+ 38 - 1
docs/ref/models/indexes.txt

@@ -21,7 +21,7 @@ options`_.
 ``Index`` options
 =================
 
-.. class:: Index(fields=(), name=None, db_tablespace=None, opclasses=(), condition=None)
+.. class:: Index(fields=(), name=None, db_tablespace=None, opclasses=(), condition=None, include=None)
 
     Creates an index (B-Tree) in the database.
 
@@ -137,3 +137,40 @@ indexes records with more than 400 pages.
 
     The ``condition`` argument is ignored with MySQL and MariaDB as neither
     supports conditional indexes.
+
+``include``
+-----------
+
+.. attribute:: Index.include
+
+.. versionadded:: 3.2
+
+A list or tuple of the names of the fields to be included in the covering index
+as non-key columns. This allows index-only scans to be used for queries that
+select only included fields (:attr:`~Index.include`) and filter only by indexed
+fields (:attr:`~Index.fields`).
+
+For example::
+
+    Index(name='covering_index', fields=['headline'], include=['pub_date'])
+
+will allow filtering on ``headline``, also selecting ``pub_date``, while
+fetching data only from the index.
+
+Using ``include`` will produce a smaller index than using a multiple column
+index but with the drawback that non-key columns can not be used for sorting or
+filtering.
+
+``include`` is ignored for databases besides PostgreSQL.
+
+:attr:`Index.name` is required when using ``include``.
+
+See the PostgreSQL documentation for more details about `covering indexes`_.
+
+.. admonition:: Restrictions on PostgreSQL
+
+    PostgreSQL 11+ only supports covering B-Tree indexes, and PostgreSQL 12+
+    also supports covering :class:`GiST indexes
+    <django.contrib.postgres.indexes.GistIndex>`.
+
+.. _covering indexes: https://www.postgresql.org/docs/current/indexes-index-only-scans.html

+ 8 - 0
docs/releases/3.2.txt

@@ -185,6 +185,10 @@ Models
 * :class:`When() <django.db.models.expressions.When>` expression now allows
   using the ``condition`` argument with ``lookups``.
 
+* The new :attr:`.Index.include` and :attr:`.UniqueConstraint.include`
+  attributes allow creating covering indexes and covering unique constraints on
+  PostgreSQL 11+.
+
 Requests and Responses
 ~~~~~~~~~~~~~~~~~~~~~~
 
@@ -263,6 +267,10 @@ backends.
   * ``introspected_small_auto_field_type``
   * ``introspected_boolean_field_type``
 
+* To enable support for covering indexes (:attr:`.Index.include`) and covering
+  unique constraints (:attr:`.UniqueConstraint.include`), set
+  ``DatabaseFeatures.supports_covering_indexes`` to ``True``.
+
 :mod:`django.contrib.gis`
 -------------------------
 

+ 17 - 0
tests/constraints/models.py

@@ -81,6 +81,23 @@ class UniqueConstraintDeferrable(models.Model):
         ]
 
 
+class UniqueConstraintInclude(models.Model):
+    name = models.CharField(max_length=255)
+    color = models.CharField(max_length=32, null=True)
+
+    class Meta:
+        required_db_features = {
+            'supports_table_check_constraints',
+        }
+        constraints = [
+            models.UniqueConstraint(
+                fields=['name'],
+                name='name_include_color_uniq',
+                include=['color'],
+            ),
+        ]
+
+
 class AbstractModel(models.Model):
     age = models.IntegerField()
 

+ 60 - 1
tests/constraints/tests.py

@@ -8,7 +8,8 @@ from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature
 
 from .models import (
     ChildModel, Product, UniqueConstraintConditionProduct,
-    UniqueConstraintDeferrable, UniqueConstraintProduct,
+    UniqueConstraintDeferrable, UniqueConstraintInclude,
+    UniqueConstraintProduct,
 )
 
 
@@ -181,6 +182,20 @@ class UniqueConstraintTests(TestCase):
         self.assertEqual(constraint_1, constraint_1)
         self.assertNotEqual(constraint_1, constraint_2)
 
+    def test_eq_with_include(self):
+        constraint_1 = models.UniqueConstraint(
+            fields=['foo', 'bar'],
+            name='include',
+            include=['baz_1'],
+        )
+        constraint_2 = models.UniqueConstraint(
+            fields=['foo', 'bar'],
+            name='include',
+            include=['baz_2'],
+        )
+        self.assertEqual(constraint_1, constraint_1)
+        self.assertNotEqual(constraint_1, constraint_2)
+
     def test_repr(self):
         fields = ['foo', 'bar']
         name = 'unique_fields'
@@ -214,6 +229,18 @@ class UniqueConstraintTests(TestCase):
             "deferrable=Deferrable.IMMEDIATE>",
         )
 
+    def test_repr_with_include(self):
+        constraint = models.UniqueConstraint(
+            fields=['foo', 'bar'],
+            name='include_fields',
+            include=['baz_1', 'baz_2'],
+        )
+        self.assertEqual(
+            repr(constraint),
+            "<UniqueConstraint: fields=('foo', 'bar') name='include_fields' "
+            "include=('baz_1', 'baz_2')>",
+        )
+
     def test_deconstruction(self):
         fields = ['foo', 'bar']
         name = 'unique_fields'
@@ -250,6 +277,20 @@ class UniqueConstraintTests(TestCase):
             'deferrable': models.Deferrable.DEFERRED,
         })
 
+    def test_deconstruction_with_include(self):
+        fields = ['foo', 'bar']
+        name = 'unique_fields'
+        include = ['baz_1', 'baz_2']
+        constraint = models.UniqueConstraint(fields=fields, name=name, include=include)
+        path, args, kwargs = constraint.deconstruct()
+        self.assertEqual(path, 'django.db.models.UniqueConstraint')
+        self.assertEqual(args, ())
+        self.assertEqual(kwargs, {
+            'fields': tuple(fields),
+            'name': name,
+            'include': tuple(include),
+        })
+
     def test_database_constraint(self):
         with self.assertRaises(IntegrityError):
             UniqueConstraintProduct.objects.create(name=self.p1.name, color=self.p1.color)
@@ -333,3 +374,21 @@ class UniqueConstraintTests(TestCase):
                 name='name_invalid',
                 deferrable='invalid',
             )
+
+    @skipUnlessDBFeature(
+        'supports_table_check_constraints',
+        'supports_covering_indexes',
+    )
+    def test_include_database_constraint(self):
+        UniqueConstraintInclude.objects.create(name='p1', color='red')
+        with self.assertRaises(IntegrityError):
+            UniqueConstraintInclude.objects.create(name='p1', color='blue')
+
+    def test_invalid_include_argument(self):
+        msg = 'UniqueConstraint.include must be a list or tuple.'
+        with self.assertRaisesMessage(ValueError, msg):
+            models.UniqueConstraint(
+                name='uniq_include',
+                fields=['field'],
+                include='other',
+            )

+ 121 - 0
tests/indexes/tests.py

@@ -236,6 +236,41 @@ class SchemaIndexesPostgreSQLTests(TransactionTestCase):
             cursor.execute(self.get_opclass_query % indexname)
             self.assertCountEqual(cursor.fetchall(), [('text_pattern_ops', indexname)])
 
+    @skipUnlessDBFeature('supports_covering_indexes')
+    def test_ops_class_include(self):
+        index_name = 'test_ops_class_include'
+        index = Index(
+            name=index_name,
+            fields=['body'],
+            opclasses=['text_pattern_ops'],
+            include=['headline'],
+        )
+        with connection.schema_editor() as editor:
+            editor.add_index(IndexedArticle2, index)
+        with editor.connection.cursor() as cursor:
+            cursor.execute(self.get_opclass_query % index_name)
+            self.assertCountEqual(cursor.fetchall(), [('text_pattern_ops', index_name)])
+
+    @skipUnlessDBFeature('supports_covering_indexes')
+    def test_ops_class_include_tablespace(self):
+        index_name = 'test_ops_class_include_tblspace'
+        index = Index(
+            name=index_name,
+            fields=['body'],
+            opclasses=['text_pattern_ops'],
+            include=['headline'],
+            db_tablespace='pg_default',
+        )
+        with connection.schema_editor() as editor:
+            editor.add_index(IndexedArticle2, index)
+            self.assertIn(
+                'TABLESPACE "pg_default"',
+                str(index.create_sql(IndexedArticle2, editor)),
+            )
+        with editor.connection.cursor() as cursor:
+            cursor.execute(self.get_opclass_query % index_name)
+            self.assertCountEqual(cursor.fetchall(), [('text_pattern_ops', index_name)])
+
     def test_ops_class_columns_lists_sql(self):
         index = Index(
             fields=['headline'],
@@ -417,3 +452,89 @@ class PartialIndexTests(TransactionTestCase):
                     cursor=cursor, table_name=Article._meta.db_table,
                 ))
             editor.remove_index(index=index, model=Article)
+
+
+@skipUnlessDBFeature('supports_covering_indexes')
+class CoveringIndexTests(TransactionTestCase):
+    available_apps = ['indexes']
+
+    def test_covering_index(self):
+        index = Index(
+            name='covering_headline_idx',
+            fields=['headline'],
+            include=['pub_date', 'published'],
+        )
+        with connection.schema_editor() as editor:
+            self.assertIn(
+                '(%s) INCLUDE (%s, %s)' % (
+                    editor.quote_name('headline'),
+                    editor.quote_name('pub_date'),
+                    editor.quote_name('published'),
+                ),
+                str(index.create_sql(Article, editor)),
+            )
+            editor.add_index(Article, index)
+            with connection.cursor() as cursor:
+                constraints = connection.introspection.get_constraints(
+                    cursor=cursor, table_name=Article._meta.db_table,
+                )
+                self.assertIn(index.name, constraints)
+                self.assertEqual(
+                    constraints[index.name]['columns'],
+                    ['headline', 'pub_date', 'published'],
+                )
+            editor.remove_index(Article, index)
+            with connection.cursor() as cursor:
+                self.assertNotIn(index.name, connection.introspection.get_constraints(
+                    cursor=cursor, table_name=Article._meta.db_table,
+                ))
+
+    def test_covering_partial_index(self):
+        index = Index(
+            name='covering_partial_headline_idx',
+            fields=['headline'],
+            include=['pub_date'],
+            condition=Q(pub_date__isnull=False),
+        )
+        with connection.schema_editor() as editor:
+            self.assertIn(
+                '(%s) INCLUDE (%s) WHERE %s ' % (
+                    editor.quote_name('headline'),
+                    editor.quote_name('pub_date'),
+                    editor.quote_name('pub_date'),
+                ),
+                str(index.create_sql(Article, editor)),
+            )
+            editor.add_index(Article, index)
+            with connection.cursor() as cursor:
+                constraints = connection.introspection.get_constraints(
+                    cursor=cursor, table_name=Article._meta.db_table,
+                )
+                self.assertIn(index.name, constraints)
+                self.assertEqual(
+                    constraints[index.name]['columns'],
+                    ['headline', 'pub_date'],
+                )
+            editor.remove_index(Article, index)
+            with connection.cursor() as cursor:
+                self.assertNotIn(index.name, connection.introspection.get_constraints(
+                    cursor=cursor, table_name=Article._meta.db_table,
+                ))
+
+
+@skipIfDBFeature('supports_covering_indexes')
+class CoveringIndexIgnoredTests(TransactionTestCase):
+    available_apps = ['indexes']
+
+    def test_covering_ignored(self):
+        index = Index(
+            name='test_covering_ignored',
+            fields=['headline'],
+            include=['pub_date'],
+        )
+        with connection.schema_editor() as editor:
+            editor.add_index(Article, index)
+        self.assertNotIn(
+            'INCLUDE (%s)' % editor.quote_name('headline'),
+            str(index.create_sql(Article, editor)),
+        )

+ 159 - 0
tests/invalid_models_tests/test_models.py

@@ -375,6 +375,78 @@ class IndexesTests(TestCase):
 
         self.assertEqual(Model.check(databases=self.databases), [])
 
+    def test_index_include_pointing_to_missing_field(self):
+        class Model(models.Model):
+            class Meta:
+                indexes = [
+                    models.Index(fields=['id'], include=['missing_field'], name='name'),
+                ]
+
+        self.assertEqual(Model.check(databases=self.databases), [
+            Error(
+                "'indexes' refers to the nonexistent field 'missing_field'.",
+                obj=Model,
+                id='models.E012',
+            ),
+        ])
+
+    def test_index_include_pointing_to_m2m_field(self):
+        class Model(models.Model):
+            m2m = models.ManyToManyField('self')
+
+            class Meta:
+                indexes = [models.Index(fields=['id'], include=['m2m'], name='name')]
+
+        self.assertEqual(Model.check(databases=self.databases), [
+            Error(
+                "'indexes' refers to a ManyToManyField 'm2m', but "
+                "ManyToManyFields are not permitted in 'indexes'.",
+                obj=Model,
+                id='models.E013',
+            ),
+        ])
+
+    def test_index_include_pointing_to_non_local_field(self):
+        class Parent(models.Model):
+            field1 = models.IntegerField()
+
+        class Child(Parent):
+            field2 = models.IntegerField()
+
+            class Meta:
+                indexes = [
+                    models.Index(fields=['field2'], include=['field1'], name='name'),
+                ]
+
+        self.assertEqual(Child.check(databases=self.databases), [
+            Error(
+                "'indexes' refers to field 'field1' which is not local to "
+                "model 'Child'.",
+                hint='This issue may be caused by multi-table inheritance.',
+                obj=Child,
+                id='models.E016',
+            ),
+        ])
+
+    def test_index_include_pointing_to_fk(self):
+        class Target(models.Model):
+            pass
+
+        class Model(models.Model):
+            fk_1 = models.ForeignKey(Target, models.CASCADE, related_name='target_1')
+            fk_2 = models.ForeignKey(Target, models.CASCADE, related_name='target_2')
+
+            class Meta:
+                constraints = [
+                    models.Index(
+                        fields=['id'],
+                        include=['fk_1_id', 'fk_2'],
+                        name='name',
+                    ),
+                ]
+
+        self.assertEqual(Model.check(databases=self.databases), [])
+
 
 @isolate_apps('invalid_models_tests')
 class FieldNamesTests(TestCase):
@@ -1568,3 +1640,90 @@ class ConstraintsTests(TestCase):
                 ]
 
         self.assertEqual(Model.check(databases=self.databases), [])
+
+    def test_unique_constraint_include_pointing_to_missing_field(self):
+        class Model(models.Model):
+            class Meta:
+                constraints = [
+                    models.UniqueConstraint(
+                        fields=['id'],
+                        include=['missing_field'],
+                        name='name',
+                    ),
+                ]
+
+        self.assertEqual(Model.check(databases=self.databases), [
+            Error(
+                "'constraints' refers to the nonexistent field "
+                "'missing_field'.",
+                obj=Model,
+                id='models.E012',
+            ),
+        ])
+
+    def test_unique_constraint_include_pointing_to_m2m_field(self):
+        class Model(models.Model):
+            m2m = models.ManyToManyField('self')
+
+            class Meta:
+                constraints = [
+                    models.UniqueConstraint(
+                        fields=['id'],
+                        include=['m2m'],
+                        name='name',
+                    ),
+                ]
+
+        self.assertEqual(Model.check(databases=self.databases), [
+            Error(
+                "'constraints' refers to a ManyToManyField 'm2m', but "
+                "ManyToManyFields are not permitted in 'constraints'.",
+                obj=Model,
+                id='models.E013',
+            ),
+        ])
+
+    def test_unique_constraint_include_pointing_to_non_local_field(self):
+        class Parent(models.Model):
+            field1 = models.IntegerField()
+
+        class Child(Parent):
+            field2 = models.IntegerField()
+
+            class Meta:
+                constraints = [
+                    models.UniqueConstraint(
+                        fields=['field2'],
+                        include=['field1'],
+                        name='name',
+                    ),
+                ]
+
+        self.assertEqual(Child.check(databases=self.databases), [
+            Error(
+                "'constraints' refers to field 'field1' which is not local to "
+                "model 'Child'.",
+                hint='This issue may be caused by multi-table inheritance.',
+                obj=Child,
+                id='models.E016',
+            ),
+        ])
+
+    def test_unique_constraint_include_pointing_to_fk(self):
+        class Target(models.Model):
+            pass
+
+        class Model(models.Model):
+            fk_1 = models.ForeignKey(Target, models.CASCADE, related_name='target_1')
+            fk_2 = models.ForeignKey(Target, models.CASCADE, related_name='target_2')
+
+            class Meta:
+                constraints = [
+                    models.UniqueConstraint(
+                        fields=['id'],
+                        include=['fk_1_id', 'fk_2'],
+                        name='name',
+                    ),
+                ]
+
+        self.assertEqual(Model.check(databases=self.databases), [])

+ 124 - 0
tests/migrations/test_operations.py

@@ -448,6 +448,48 @@ class OperationTests(OperationTestBase):
             [deferred_unique_constraint],
         )
 
+    @skipUnlessDBFeature('supports_covering_indexes')
+    def test_create_model_with_covering_unique_constraint(self):
+        covering_unique_constraint = models.UniqueConstraint(
+            fields=['pink'],
+            include=['weight'],
+            name='test_constraint_pony_pink_covering_weight',
+        )
+        operation = migrations.CreateModel(
+            'Pony',
+            [
+                ('id', models.AutoField(primary_key=True)),
+                ('pink', models.IntegerField(default=3)),
+                ('weight', models.FloatField()),
+            ],
+            options={'constraints': [covering_unique_constraint]},
+        )
+        project_state = ProjectState()
+        new_state = project_state.clone()
+        operation.state_forwards('test_crmo', new_state)
+        self.assertEqual(len(new_state.models['test_crmo', 'pony'].options['constraints']), 1)
+        self.assertTableNotExists('test_crmo_pony')
+        # Create table.
+        with connection.schema_editor() as editor:
+            operation.database_forwards('test_crmo', editor, project_state, new_state)
+        self.assertTableExists('test_crmo_pony')
+        Pony = new_state.apps.get_model('test_crmo', 'Pony')
+        Pony.objects.create(pink=1, weight=4.0)
+        with self.assertRaises(IntegrityError):
+            Pony.objects.create(pink=1, weight=7.0)
+        # Reversal.
+        with connection.schema_editor() as editor:
+            operation.database_backwards('test_crmo', editor, new_state, project_state)
+        self.assertTableNotExists('test_crmo_pony')
+        # Deconstruction.
+        definition = operation.deconstruct()
+        self.assertEqual(definition[0], 'CreateModel')
+        self.assertEqual(definition[1], [])
+        self.assertEqual(
+            definition[2]['options']['constraints'],
+            [covering_unique_constraint],
+        )
+
     def test_create_model_managers(self):
         """
         The managers on a model are set.
@@ -2236,6 +2278,88 @@ class OperationTests(OperationTestBase):
             'name': 'deferred_pink_constraint_rm',
         })
 
+    def test_add_covering_unique_constraint(self):
+        app_label = 'test_addcovering_uc'
+        project_state = self.set_up_test_model(app_label)
+        covering_unique_constraint = models.UniqueConstraint(
+            fields=['pink'],
+            name='covering_pink_constraint_add',
+            include=['weight'],
+        )
+        operation = migrations.AddConstraint('Pony', covering_unique_constraint)
+        self.assertEqual(
+            operation.describe(),
+            'Create constraint covering_pink_constraint_add on model Pony',
+        )
+        # Add constraint.
+        new_state = project_state.clone()
+        operation.state_forwards(app_label, new_state)
+        self.assertEqual(len(new_state.models[app_label, 'pony'].options['constraints']), 1)
+        Pony = new_state.apps.get_model(app_label, 'Pony')
+        self.assertEqual(len(Pony._meta.constraints), 1)
+        with connection.schema_editor() as editor:
+            operation.database_forwards(app_label, editor, project_state, new_state)
+        Pony.objects.create(pink=1, weight=4.0)
+        if connection.features.supports_covering_indexes:
+            with self.assertRaises(IntegrityError):
+                Pony.objects.create(pink=1, weight=4.0)
+        else:
+            Pony.objects.create(pink=1, weight=4.0)
+        # Reversal.
+        with connection.schema_editor() as editor:
+            operation.database_backwards(app_label, editor, new_state, project_state)
+        # Constraint doesn't work.
+        Pony.objects.create(pink=1, weight=4.0)
+        # Deconstruction.
+        definition = operation.deconstruct()
+        self.assertEqual(definition[0], 'AddConstraint')
+        self.assertEqual(definition[1], [])
+        self.assertEqual(
+            definition[2],
+            {'model_name': 'Pony', 'constraint': covering_unique_constraint},
+        )
+
+    def test_remove_covering_unique_constraint(self):
+        app_label = 'test_removecovering_uc'
+        covering_unique_constraint = models.UniqueConstraint(
+            fields=['pink'],
+            name='covering_pink_constraint_rm',
+            include=['weight'],
+        )
+        project_state = self.set_up_test_model(app_label, constraints=[covering_unique_constraint])
+        operation = migrations.RemoveConstraint('Pony', covering_unique_constraint.name)
+        self.assertEqual(
+            operation.describe(),
+            'Remove constraint covering_pink_constraint_rm from model Pony',
+        )
+        # Remove constraint.
+        new_state = project_state.clone()
+        operation.state_forwards(app_label, new_state)
+        self.assertEqual(len(new_state.models[app_label, 'pony'].options['constraints']), 0)
+        Pony = new_state.apps.get_model(app_label, 'Pony')
+        self.assertEqual(len(Pony._meta.constraints), 0)
+        with connection.schema_editor() as editor:
+            operation.database_forwards(app_label, editor, project_state, new_state)
+        # Constraint doesn't work.
+        Pony.objects.create(pink=1, weight=4.0)
+        Pony.objects.create(pink=1, weight=4.0).delete()
+        # Reversal.
+        with connection.schema_editor() as editor:
+            operation.database_backwards(app_label, editor, new_state, project_state)
+        if connection.features.supports_covering_indexes:
+            with self.assertRaises(IntegrityError):
+                Pony.objects.create(pink=1, weight=4.0)
+        else:
+            Pony.objects.create(pink=1, weight=4.0)
+        # Deconstruction.
+        definition = operation.deconstruct()
+        self.assertEqual(definition[0], 'RemoveConstraint')
+        self.assertEqual(definition[1], [])
+        self.assertEqual(definition[2], {
+            'model_name': 'Pony',
+            'name': 'covering_pink_constraint_rm',
+        })
+
     def test_alter_model_options(self):
         """
         Tests the AlterModelOptions operation.

+ 38 - 0
tests/model_indexes/tests.py

@@ -17,9 +17,18 @@ class SimpleIndexesTests(SimpleTestCase):
         index = models.Index(fields=['title'])
         multi_col_index = models.Index(fields=['title', 'author'])
         partial_index = models.Index(fields=['title'], name='long_books_idx', condition=models.Q(pages__gt=400))
+        covering_index = models.Index(
+            fields=['title'],
+            name='include_idx',
+            include=['author', 'pages'],
+        )
         self.assertEqual(repr(index), "<Index: fields='title'>")
         self.assertEqual(repr(multi_col_index), "<Index: fields='title, author'>")
         self.assertEqual(repr(partial_index), "<Index: fields='title', condition=(AND: ('pages__gt', 400))>")
+        self.assertEqual(
+            repr(covering_index),
+            "<Index: fields='title', include='author, pages'>",
+        )
 
     def test_eq(self):
         index = models.Index(fields=['title'])
@@ -65,6 +74,16 @@ class SimpleIndexesTests(SimpleTestCase):
         with self.assertRaisesMessage(ValueError, 'Index.condition must be a Q instance.'):
             models.Index(condition='invalid', name='long_book_idx')
 
+    def test_include_requires_list_or_tuple(self):
+        msg = 'Index.include must be a list or tuple.'
+        with self.assertRaisesMessage(ValueError, msg):
+            models.Index(name='test_include', fields=['field'], include='other')
+
+    def test_include_requires_index_name(self):
+        msg = 'A covering index must be named.'
+        with self.assertRaisesMessage(ValueError, msg):
+            models.Index(fields=['field'], include=['other'])
+
     def test_name_auto_generation(self):
         index = models.Index(fields=['author'])
         index.set_name_with_model(Book)
@@ -128,6 +147,25 @@ class SimpleIndexesTests(SimpleTestCase):
             }
         )
 
+    def test_deconstruct_with_include(self):
+        index = models.Index(
+            name='book_include_idx',
+            fields=['title'],
+            include=['author'],
+        )
+        index.set_name_with_model(Book)
+        path, args, kwargs = index.deconstruct()
+        self.assertEqual(path, 'django.db.models.Index')
+        self.assertEqual(args, ())
+        self.assertEqual(
+            kwargs,
+            {
+                'fields': ['title'],
+                'name': 'model_index_title_196f42_idx',
+                'include': ('author',),
+            },
+        )
+
     def test_clone(self):
         index = models.Index(fields=['title'])
         new_index = index.clone()

+ 28 - 1
tests/postgres_tests/test_indexes.py

@@ -11,7 +11,7 @@ from django.test import skipUnlessDBFeature
 from django.test.utils import register_lookup
 
 from . import PostgreSQLSimpleTestCase, PostgreSQLTestCase
-from .models import CharFieldModel, IntegerArrayModel
+from .models import CharFieldModel, IntegerArrayModel, Scene
 
 
 class IndexTestMixin:
@@ -373,6 +373,33 @@ class SchemaTests(PostgreSQLTestCase):
             editor.remove_index(CharFieldModel, index)
         self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table))
 
+    @skipUnlessDBFeature('supports_covering_gist_indexes')
+    def test_gist_include(self):
+        index_name = 'scene_gist_include_setting'
+        index = GistIndex(name=index_name, fields=['scene'], include=['setting'])
+        with connection.schema_editor() as editor:
+            editor.add_index(Scene, index)
+        constraints = self.get_constraints(Scene._meta.db_table)
+        self.assertIn(index_name, constraints)
+        self.assertEqual(constraints[index_name]['type'], GistIndex.suffix)
+        self.assertEqual(constraints[index_name]['columns'], ['scene', 'setting'])
+        with connection.schema_editor() as editor:
+            editor.remove_index(Scene, index)
+        self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table))
+
+    def test_gist_include_not_supported(self):
+        index_name = 'gist_include_exception'
+        index = GistIndex(fields=['scene'], name=index_name, include=['setting'])
+        msg = 'Covering GiST indexes requires PostgreSQL 12+.'
+        with self.assertRaisesMessage(NotSupportedError, msg):
+            with mock.patch(
+                'django.db.backends.postgresql.features.DatabaseFeatures.supports_covering_gist_indexes',
+                False,
+            ):
+                with connection.schema_editor() as editor:
+                    editor.add_index(Scene, index)
+        self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table))
+
     def test_hash_index(self):
         # Ensure the table is there and doesn't have an index.
         self.assertNotIn('field', self.get_constraints(CharFieldModel._meta.db_table))

+ 1 - 0
tests/schema/tests.py

@@ -2587,6 +2587,7 @@ class SchemaTests(TransactionTestCase):
                     "columns": editor.quote_name(column),
                     "extra": "",
                     "condition": "",
+                    "include": "",
                 }
             )
             self.assertIn(expected_constraint_name, self.get_constraints(model._meta.db_table))