123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607 |
- from __future__ import unicode_literals
- from bisect import bisect
- from collections import OrderedDict
- from django.apps import apps
- from django.conf import settings
- from django.core.exceptions import FieldDoesNotExist
- from django.db.models.fields.related import ManyToManyRel
- from django.db.models.fields import AutoField
- from django.db.models.fields.proxy import OrderWrt
- from django.utils import six
- from django.utils.encoding import force_text, smart_text, python_2_unicode_compatible
- from django.utils.functional import cached_property
- from django.utils.text import camel_case_to_spaces
- from django.utils.translation import activate, deactivate_all, get_language, string_concat
- DEFAULT_NAMES = ('verbose_name', 'verbose_name_plural', 'db_table', 'ordering',
- 'unique_together', 'permissions', 'get_latest_by',
- 'order_with_respect_to', 'app_label', 'db_tablespace',
- 'abstract', 'managed', 'proxy', 'swappable', 'auto_created',
- 'index_together', 'apps', 'default_permissions',
- 'select_on_save', 'default_related_name')
- def normalize_together(option_together):
- """
- option_together can be either a tuple of tuples, or a single
- tuple of two strings. Normalize it to a tuple of tuples, so that
- calling code can uniformly expect that.
- """
- try:
- if not option_together:
- return ()
- if not isinstance(option_together, (tuple, list)):
- raise TypeError
- first_element = next(iter(option_together))
- if not isinstance(first_element, (tuple, list)):
- option_together = (option_together,)
- # Normalize everything to tuples
- return tuple(tuple(ot) for ot in option_together)
- except TypeError:
- # If the value of option_together isn't valid, return it
- # verbatim; this will be picked up by the check framework later.
- return option_together
- @python_2_unicode_compatible
- class Options(object):
- def __init__(self, meta, app_label=None):
- self.local_fields = []
- self.local_many_to_many = []
- self.virtual_fields = []
- self.model_name = None
- self.verbose_name = None
- self.verbose_name_plural = None
- self.db_table = ''
- self.ordering = []
- self.unique_together = []
- self.index_together = []
- self.select_on_save = False
- self.default_permissions = ('add', 'change', 'delete')
- self.permissions = []
- self.object_name = None
- self.app_label = app_label
- self.get_latest_by = None
- self.order_with_respect_to = None
- self.db_tablespace = settings.DEFAULT_TABLESPACE
- self.meta = meta
- self.pk = None
- self.has_auto_field = False
- self.auto_field = None
- self.abstract = False
- self.managed = True
- self.proxy = False
- # For any class that is a proxy (including automatically created
- # classes for deferred object loading), proxy_for_model tells us
- # which class this model is proxying. Note that proxy_for_model
- # can create a chain of proxy models. For non-proxy models, the
- # variable is always None.
- self.proxy_for_model = None
- # For any non-abstract class, the concrete class is the model
- # in the end of the proxy_for_model chain. In particular, for
- # concrete models, the concrete_model is always the class itself.
- self.concrete_model = None
- self.swappable = None
- self.parents = OrderedDict()
- self.auto_created = False
- # To handle various inheritance situations, we need to track where
- # managers came from (concrete or abstract base classes). `managers`
- # keeps a list of 3-tuples of the form:
- # (creation_counter, instance, abstract(=True))
- self.managers = []
- # List of all lookups defined in ForeignKey 'limit_choices_to' options
- # from *other* models. Needed for some admin checks. Internal use only.
- self.related_fkey_lookups = []
- # A custom app registry to use, if you're making a separate model set.
- self.apps = apps
- self.default_related_name = None
- @property
- def app_config(self):
- # Don't go through get_app_config to avoid triggering imports.
- return self.apps.app_configs.get(self.app_label)
- @property
- def installed(self):
- return self.app_config is not None
- @property
- def abstract_managers(self):
- return [
- (counter, instance.name, instance) for counter, instance, abstract
- in self.managers if abstract
- ]
- @property
- def concrete_managers(self):
- return [
- (counter, instance.name, instance) for counter, instance, abstract
- in self.managers if not abstract
- ]
- def contribute_to_class(self, cls, name):
- from django.db import connection
- from django.db.backends.utils import truncate_name
- cls._meta = self
- self.model = cls
- # First, construct the default values for these options.
- self.object_name = cls.__name__
- self.model_name = self.object_name.lower()
- self.verbose_name = camel_case_to_spaces(self.object_name)
- # Store the original user-defined values for each option,
- # for use when serializing the model definition
- self.original_attrs = {}
- # Next, apply any overridden values from 'class Meta'.
- if self.meta:
- meta_attrs = self.meta.__dict__.copy()
- for name in self.meta.__dict__:
- # Ignore any private attributes that Django doesn't care about.
- # NOTE: We can't modify a dictionary's contents while looping
- # over it, so we loop over the *original* dictionary instead.
- if name.startswith('_'):
- del meta_attrs[name]
- for attr_name in DEFAULT_NAMES:
- if attr_name in meta_attrs:
- setattr(self, attr_name, meta_attrs.pop(attr_name))
- self.original_attrs[attr_name] = getattr(self, attr_name)
- elif hasattr(self.meta, attr_name):
- setattr(self, attr_name, getattr(self.meta, attr_name))
- self.original_attrs[attr_name] = getattr(self, attr_name)
- ut = meta_attrs.pop('unique_together', self.unique_together)
- self.unique_together = normalize_together(ut)
- it = meta_attrs.pop('index_together', self.index_together)
- self.index_together = normalize_together(it)
- # verbose_name_plural is a special case because it uses a 's'
- # by default.
- if self.verbose_name_plural is None:
- self.verbose_name_plural = string_concat(self.verbose_name, 's')
- # Any leftover attributes must be invalid.
- if meta_attrs != {}:
- raise TypeError("'class Meta' got invalid attribute(s): %s" % ','.join(meta_attrs.keys()))
- else:
- self.verbose_name_plural = string_concat(self.verbose_name, 's')
- del self.meta
- # If the db_table wasn't provided, use the app_label + model_name.
- if not self.db_table:
- self.db_table = "%s_%s" % (self.app_label, self.model_name)
- self.db_table = truncate_name(self.db_table, connection.ops.max_name_length())
- def _prepare(self, model):
- if self.order_with_respect_to:
- self.order_with_respect_to = self.get_field(self.order_with_respect_to)
- self.ordering = ('_order',)
- if not any(isinstance(field, OrderWrt) for field in model._meta.local_fields):
- model.add_to_class('_order', OrderWrt())
- else:
- self.order_with_respect_to = None
- if self.pk is None:
- if self.parents:
- # Promote the first parent link in lieu of adding yet another
- # field.
- field = next(six.itervalues(self.parents))
- # Look for a local field with the same name as the
- # first parent link. If a local field has already been
- # created, use it instead of promoting the parent
- already_created = [fld for fld in self.local_fields if fld.name == field.name]
- if already_created:
- field = already_created[0]
- field.primary_key = True
- self.setup_pk(field)
- else:
- auto = AutoField(verbose_name='ID', primary_key=True,
- auto_created=True)
- model.add_to_class('id', auto)
- def add_field(self, field):
- # Insert the given field in the order in which it was created, using
- # the "creation_counter" attribute of the field.
- # Move many-to-many related fields from self.fields into
- # self.many_to_many.
- if field.rel and isinstance(field.rel, ManyToManyRel):
- self.local_many_to_many.insert(bisect(self.local_many_to_many, field), field)
- if hasattr(self, '_m2m_cache'):
- del self._m2m_cache
- else:
- self.local_fields.insert(bisect(self.local_fields, field), field)
- self.setup_pk(field)
- if hasattr(self, '_field_cache'):
- del self._field_cache
- del self._field_name_cache
- # The fields, concrete_fields and local_concrete_fields are
- # implemented as cached properties for performance reasons.
- # The attrs will not exists if the cached property isn't
- # accessed yet, hence the try-excepts.
- try:
- del self.fields
- except AttributeError:
- pass
- try:
- del self.concrete_fields
- except AttributeError:
- pass
- try:
- del self.local_concrete_fields
- except AttributeError:
- pass
- if hasattr(self, '_name_map'):
- del self._name_map
- def add_virtual_field(self, field):
- self.virtual_fields.append(field)
- def setup_pk(self, field):
- if not self.pk and field.primary_key:
- self.pk = field
- field.serialize = False
- def pk_index(self):
- """
- Returns the index of the primary key field in the self.concrete_fields
- list.
- """
- return self.concrete_fields.index(self.pk)
- def setup_proxy(self, target):
- """
- Does the internal setup so that the current model is a proxy for
- "target".
- """
- self.pk = target._meta.pk
- self.proxy_for_model = target
- self.db_table = target._meta.db_table
- def __repr__(self):
- return '<Options for %s>' % self.object_name
- def __str__(self):
- return "%s.%s" % (smart_text(self.app_label), smart_text(self.model_name))
- def verbose_name_raw(self):
- """
- There are a few places where the untranslated verbose name is needed
- (so that we get the same value regardless of currently active
- locale).
- """
- lang = get_language()
- deactivate_all()
- raw = force_text(self.verbose_name)
- activate(lang)
- return raw
- verbose_name_raw = property(verbose_name_raw)
- def _swapped(self):
- """
- Has this model been swapped out for another? If so, return the model
- name of the replacement; otherwise, return None.
- For historical reasons, model name lookups using get_model() are
- case insensitive, so we make sure we are case insensitive here.
- """
- if self.swappable:
- model_label = '%s.%s' % (self.app_label, self.model_name)
- swapped_for = getattr(settings, self.swappable, None)
- if swapped_for:
- try:
- swapped_label, swapped_object = swapped_for.split('.')
- except ValueError:
- # setting not in the format app_label.model_name
- # raising ImproperlyConfigured here causes problems with
- # test cleanup code - instead it is raised in get_user_model
- # or as part of validation.
- return swapped_for
- if '%s.%s' % (swapped_label, swapped_object.lower()) not in (None, model_label):
- return swapped_for
- return None
- swapped = property(_swapped)
- @cached_property
- def fields(self):
- """
- The getter for self.fields. This returns the list of field objects
- available to this model (including through parent models).
- Callers are not permitted to modify this list, since it's a reference
- to this instance (not a copy).
- """
- try:
- self._field_name_cache
- except AttributeError:
- self._fill_fields_cache()
- return self._field_name_cache
- @cached_property
- def concrete_fields(self):
- return [f for f in self.fields if f.column is not None]
- @cached_property
- def local_concrete_fields(self):
- return [f for f in self.local_fields if f.column is not None]
- def get_fields_with_model(self):
- """
- Returns a sequence of (field, model) pairs for all fields. The "model"
- element is None for fields on the current model. Mostly of use when
- constructing queries so that we know which model a field belongs to.
- """
- try:
- self._field_cache
- except AttributeError:
- self._fill_fields_cache()
- return self._field_cache
- def get_concrete_fields_with_model(self):
- return [(field, model) for field, model in self.get_fields_with_model() if
- field.column is not None]
- def _fill_fields_cache(self):
- cache = []
- for parent in self.parents:
- for field, model in parent._meta.get_fields_with_model():
- if model:
- cache.append((field, model))
- else:
- cache.append((field, parent))
- cache.extend((f, None) for f in self.local_fields)
- self._field_cache = tuple(cache)
- self._field_name_cache = [x for x, _ in cache]
- def _many_to_many(self):
- try:
- self._m2m_cache
- except AttributeError:
- self._fill_m2m_cache()
- return list(self._m2m_cache)
- many_to_many = property(_many_to_many)
- def get_m2m_with_model(self):
- """
- The many-to-many version of get_fields_with_model().
- """
- try:
- self._m2m_cache
- except AttributeError:
- self._fill_m2m_cache()
- return list(six.iteritems(self._m2m_cache))
- def _fill_m2m_cache(self):
- cache = OrderedDict()
- for parent in self.parents:
- for field, model in parent._meta.get_m2m_with_model():
- if model:
- cache[field] = model
- else:
- cache[field] = parent
- for field in self.local_many_to_many:
- cache[field] = None
- self._m2m_cache = cache
- def get_field(self, name, many_to_many=True):
- """
- Returns the requested field by name. Raises FieldDoesNotExist on error.
- """
- to_search = (self.fields + self.many_to_many) if many_to_many else self.fields
- for f in to_search:
- if f.name == name:
- return f
- raise FieldDoesNotExist('%s has no field named %r' % (self.object_name, name))
- def get_field_by_name(self, name):
- """
- Returns the (field_object, model, direct, m2m), where field_object is
- the Field instance for the given name, model is the model containing
- this field (None for local fields), direct is True if the field exists
- on this model, and m2m is True for many-to-many relations. When
- 'direct' is False, 'field_object' is the corresponding ForeignObjectRel
- for this field (since the field doesn't have an instance associated
- with it).
- Uses a cache internally, so after the first access, this is very fast.
- """
- try:
- try:
- return self._name_map[name]
- except AttributeError:
- cache = self.init_name_map()
- return cache[name]
- except KeyError:
- raise FieldDoesNotExist('%s has no field named %r'
- % (self.object_name, name))
- def get_all_field_names(self):
- """
- Returns a list of all field names that are possible for this model
- (including reverse relation names). This is used for pretty printing
- debugging output (a list of choices), so any internal-only field names
- are not included.
- """
- try:
- cache = self._name_map
- except AttributeError:
- cache = self.init_name_map()
- names = sorted(cache.keys())
- # Internal-only names end with "+" (symmetrical m2m related names being
- # the main example). Trim them.
- return [val for val in names if not val.endswith('+')]
- def init_name_map(self):
- """
- Initialises the field name -> field object mapping.
- """
- cache = {}
- # We intentionally handle related m2m objects first so that symmetrical
- # m2m accessor names can be overridden, if necessary.
- for f, model in self.get_all_related_m2m_objects_with_model():
- cache[f.field.related_query_name()] = (f, model, False, True)
- for f, model in self.get_all_related_objects_with_model():
- cache[f.field.related_query_name()] = (f, model, False, False)
- for f, model in self.get_m2m_with_model():
- cache[f.name] = cache[f.attname] = (f, model, True, True)
- for f, model in self.get_fields_with_model():
- cache[f.name] = cache[f.attname] = (f, model, True, False)
- for f in self.virtual_fields:
- if f.rel:
- cache[f.name] = cache[f.attname] = (
- f, None if f.model == self.model else f.model, True, False)
- if apps.ready:
- self._name_map = cache
- return cache
- def get_all_related_objects(self, local_only=False, include_hidden=False,
- include_proxy_eq=False):
- return [k for k, v in self.get_all_related_objects_with_model(
- local_only=local_only, include_hidden=include_hidden,
- include_proxy_eq=include_proxy_eq)]
- def get_all_related_objects_with_model(self, local_only=False,
- include_hidden=False,
- include_proxy_eq=False):
- """
- Returns a list of (related-object, model) pairs. Similar to
- get_fields_with_model().
- """
- try:
- self._related_objects_cache
- except AttributeError:
- self._fill_related_objects_cache()
- predicates = []
- if local_only:
- predicates.append(lambda k, v: not v)
- if not include_hidden:
- predicates.append(lambda k, v: not k.field.rel.is_hidden())
- cache = (self._related_objects_proxy_cache if include_proxy_eq
- else self._related_objects_cache)
- return [t for t in cache.items() if all(p(*t) for p in predicates)]
- def _fill_related_objects_cache(self):
- cache = OrderedDict()
- parent_list = self.get_parent_list()
- for parent in self.parents:
- for obj, model in parent._meta.get_all_related_objects_with_model(include_hidden=True):
- if (obj.field.creation_counter < 0 or obj.field.rel.parent_link) and obj.model not in parent_list:
- continue
- if not model:
- cache[obj] = parent
- else:
- cache[obj] = model
- # Collect also objects which are in relation to some proxy child/parent of self.
- proxy_cache = cache.copy()
- for klass in self.apps.get_models(include_auto_created=True):
- if not klass._meta.swapped:
- for f in klass._meta.local_fields + klass._meta.virtual_fields:
- if (hasattr(f, 'rel') and f.rel and not isinstance(f.rel.to, six.string_types)
- and f.generate_reverse_relation):
- if self == f.rel.to._meta:
- cache[f.rel] = None
- proxy_cache[f.rel] = None
- elif self.concrete_model == f.rel.to._meta.concrete_model:
- proxy_cache[f.rel] = None
- self._related_objects_cache = cache
- self._related_objects_proxy_cache = proxy_cache
- def get_all_related_many_to_many_objects(self, local_only=False):
- try:
- cache = self._related_many_to_many_cache
- except AttributeError:
- cache = self._fill_related_many_to_many_cache()
- if local_only:
- return [k for k, v in cache.items() if not v]
- return list(cache)
- def get_all_related_m2m_objects_with_model(self):
- """
- Returns a list of (related-m2m-object, model) pairs. Similar to
- get_fields_with_model().
- """
- try:
- cache = self._related_many_to_many_cache
- except AttributeError:
- cache = self._fill_related_many_to_many_cache()
- return list(six.iteritems(cache))
- def _fill_related_many_to_many_cache(self):
- cache = OrderedDict()
- parent_list = self.get_parent_list()
- for parent in self.parents:
- for obj, model in parent._meta.get_all_related_m2m_objects_with_model():
- if obj.field.creation_counter < 0 and obj.model not in parent_list:
- continue
- if not model:
- cache[obj] = parent
- else:
- cache[obj] = model
- for klass in self.apps.get_models():
- if not klass._meta.swapped:
- for f in klass._meta.local_many_to_many:
- if (f.rel
- and not isinstance(f.rel.to, six.string_types)
- and self == f.rel.to._meta):
- cache[f.rel] = None
- if apps.ready:
- self._related_many_to_many_cache = cache
- return cache
- def get_base_chain(self, model):
- """
- Returns a list of parent classes leading to 'model' (order from closet
- to most distant ancestor). This has to handle the case were 'model' is
- a grandparent or even more distant relation.
- """
- if not self.parents:
- return None
- if model in self.parents:
- return [model]
- for parent in self.parents:
- res = parent._meta.get_base_chain(model)
- if res:
- res.insert(0, parent)
- return res
- return None
- def get_parent_list(self):
- """
- Returns a list of all the ancestor of this model as a list. Useful for
- determining if something is an ancestor, regardless of lineage.
- """
- result = set()
- for parent in self.parents:
- result.add(parent)
- result.update(parent._meta.get_parent_list())
- return result
- def get_ancestor_link(self, ancestor):
- """
- Returns the field on the current model which points to the given
- "ancestor". This is possible an indirect link (a pointer to a parent
- model, which points, eventually, to the ancestor). Used when
- constructing table joins for model inheritance.
- Returns None if the model isn't an ancestor of this one.
- """
- if ancestor in self.parents:
- return self.parents[ancestor]
- for parent in self.parents:
- # Tries to get a link field from the immediate parent
- parent_link = parent._meta.get_ancestor_link(ancestor)
- if parent_link:
- # In case of a proxied model, the first link
- # of the chain to the ancestor is that parent
- # links
- return self.parents[parent] or parent_link
|