inspectdb.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. import keyword
  2. import re
  3. from django.core.management.base import BaseCommand, CommandError
  4. from django.db import DEFAULT_DB_ALIAS, connections
  5. from django.db.models.constants import LOOKUP_SEP
  6. class Command(BaseCommand):
  7. help = (
  8. "Introspects the database tables in the given database and outputs a Django "
  9. "model module."
  10. )
  11. requires_system_checks = []
  12. stealth_options = ("table_name_filter",)
  13. db_module = "django.db"
  14. def add_arguments(self, parser):
  15. parser.add_argument(
  16. "table",
  17. nargs="*",
  18. type=str,
  19. help="Selects what tables or views should be introspected.",
  20. )
  21. parser.add_argument(
  22. "--database",
  23. default=DEFAULT_DB_ALIAS,
  24. help=(
  25. 'Nominates a database to introspect. Defaults to using the "default" '
  26. "database."
  27. ),
  28. )
  29. parser.add_argument(
  30. "--include-partitions",
  31. action="store_true",
  32. help="Also output models for partition tables.",
  33. )
  34. parser.add_argument(
  35. "--include-views",
  36. action="store_true",
  37. help="Also output models for database views.",
  38. )
  39. def handle(self, **options):
  40. try:
  41. for line in self.handle_inspection(options):
  42. self.stdout.write(line)
  43. except NotImplementedError:
  44. raise CommandError(
  45. "Database inspection isn't supported for the currently selected "
  46. "database backend."
  47. )
  48. def handle_inspection(self, options):
  49. connection = connections[options["database"]]
  50. # 'table_name_filter' is a stealth option
  51. table_name_filter = options.get("table_name_filter")
  52. def table2model(table_name):
  53. return re.sub(r"[^a-zA-Z0-9]", "", table_name.title())
  54. with connection.cursor() as cursor:
  55. yield "# This is an auto-generated Django model module."
  56. yield "# You'll have to do the following manually to clean this up:"
  57. yield "# * Rearrange models' order"
  58. yield "# * Make sure each model has one field with primary_key=True"
  59. yield (
  60. "# * Make sure each ForeignKey and OneToOneField has `on_delete` set "
  61. "to the desired behavior"
  62. )
  63. yield (
  64. "# * Remove `managed = False` lines if you wish to allow "
  65. "Django to create, modify, and delete the table"
  66. )
  67. yield (
  68. "# Feel free to rename the models, but don't rename db_table values or "
  69. "field names."
  70. )
  71. yield "from %s import models" % self.db_module
  72. known_models = []
  73. # Determine types of tables and/or views to be introspected.
  74. types = {"t"}
  75. if options["include_partitions"]:
  76. types.add("p")
  77. if options["include_views"]:
  78. types.add("v")
  79. table_info = connection.introspection.get_table_list(cursor)
  80. table_info = {info.name: info for info in table_info if info.type in types}
  81. for table_name in options["table"] or sorted(name for name in table_info):
  82. if table_name_filter is not None and callable(table_name_filter):
  83. if not table_name_filter(table_name):
  84. continue
  85. try:
  86. try:
  87. relations = connection.introspection.get_relations(
  88. cursor, table_name
  89. )
  90. except NotImplementedError:
  91. relations = {}
  92. try:
  93. constraints = connection.introspection.get_constraints(
  94. cursor, table_name
  95. )
  96. except NotImplementedError:
  97. constraints = {}
  98. primary_key_columns = (
  99. connection.introspection.get_primary_key_columns(
  100. cursor, table_name
  101. )
  102. )
  103. primary_key_column = (
  104. primary_key_columns[0] if primary_key_columns else None
  105. )
  106. unique_columns = [
  107. c["columns"][0]
  108. for c in constraints.values()
  109. if c["unique"] and len(c["columns"]) == 1
  110. ]
  111. table_description = connection.introspection.get_table_description(
  112. cursor, table_name
  113. )
  114. except Exception as e:
  115. yield "# Unable to inspect table '%s'" % table_name
  116. yield "# The error was: %s" % e
  117. continue
  118. model_name = table2model(table_name)
  119. yield ""
  120. yield ""
  121. yield "class %s(models.Model):" % model_name
  122. known_models.append(model_name)
  123. used_column_names = [] # Holds column names used in the table so far
  124. column_to_field_name = {} # Maps column names to names of model fields
  125. used_relations = set() # Holds foreign relations used in the table.
  126. for row in table_description:
  127. comment_notes = (
  128. []
  129. ) # Holds Field notes, to be displayed in a Python comment.
  130. extra_params = {} # Holds Field parameters such as 'db_column'.
  131. column_name = row.name
  132. is_relation = column_name in relations
  133. att_name, params, notes = self.normalize_col_name(
  134. column_name, used_column_names, is_relation
  135. )
  136. extra_params.update(params)
  137. comment_notes.extend(notes)
  138. used_column_names.append(att_name)
  139. column_to_field_name[column_name] = att_name
  140. # Add primary_key and unique, if necessary.
  141. if column_name == primary_key_column:
  142. extra_params["primary_key"] = True
  143. if len(primary_key_columns) > 1:
  144. comment_notes.append(
  145. "The composite primary key (%s) found, that is not "
  146. "supported. The first column is selected."
  147. % ", ".join(primary_key_columns)
  148. )
  149. elif column_name in unique_columns:
  150. extra_params["unique"] = True
  151. if is_relation:
  152. ref_db_column, ref_db_table = relations[column_name]
  153. if extra_params.pop("unique", False) or extra_params.get(
  154. "primary_key"
  155. ):
  156. rel_type = "OneToOneField"
  157. else:
  158. rel_type = "ForeignKey"
  159. ref_pk_column = (
  160. connection.introspection.get_primary_key_column(
  161. cursor, ref_db_table
  162. )
  163. )
  164. if ref_pk_column and ref_pk_column != ref_db_column:
  165. extra_params["to_field"] = ref_db_column
  166. rel_to = (
  167. "self"
  168. if ref_db_table == table_name
  169. else table2model(ref_db_table)
  170. )
  171. if rel_to in known_models:
  172. field_type = "%s(%s" % (rel_type, rel_to)
  173. else:
  174. field_type = "%s('%s'" % (rel_type, rel_to)
  175. if rel_to in used_relations:
  176. extra_params["related_name"] = "%s_%s_set" % (
  177. model_name.lower(),
  178. att_name,
  179. )
  180. used_relations.add(rel_to)
  181. else:
  182. # Calling `get_field_type` to get the field type string and any
  183. # additional parameters and notes.
  184. field_type, field_params, field_notes = self.get_field_type(
  185. connection, table_name, row
  186. )
  187. extra_params.update(field_params)
  188. comment_notes.extend(field_notes)
  189. field_type += "("
  190. # Don't output 'id = meta.AutoField(primary_key=True)', because
  191. # that's assumed if it doesn't exist.
  192. if att_name == "id" and extra_params == {"primary_key": True}:
  193. if field_type == "AutoField(":
  194. continue
  195. elif (
  196. field_type
  197. == connection.features.introspected_field_types["AutoField"]
  198. + "("
  199. ):
  200. comment_notes.append("AutoField?")
  201. # Add 'null' and 'blank', if the 'null_ok' flag was present in the
  202. # table description.
  203. if row.null_ok: # If it's NULL...
  204. extra_params["blank"] = True
  205. extra_params["null"] = True
  206. field_desc = "%s = %s%s" % (
  207. att_name,
  208. # Custom fields will have a dotted path
  209. "" if "." in field_type else "models.",
  210. field_type,
  211. )
  212. if field_type.startswith(("ForeignKey(", "OneToOneField(")):
  213. field_desc += ", models.DO_NOTHING"
  214. # Add comment.
  215. if connection.features.supports_comments and row.comment:
  216. extra_params["db_comment"] = row.comment
  217. if extra_params:
  218. if not field_desc.endswith("("):
  219. field_desc += ", "
  220. field_desc += ", ".join(
  221. "%s=%r" % (k, v) for k, v in extra_params.items()
  222. )
  223. field_desc += ")"
  224. if comment_notes:
  225. field_desc += " # " + " ".join(comment_notes)
  226. yield " %s" % field_desc
  227. comment = None
  228. if info := table_info.get(table_name):
  229. is_view = info.type == "v"
  230. is_partition = info.type == "p"
  231. if connection.features.supports_comments:
  232. comment = info.comment
  233. else:
  234. is_view = False
  235. is_partition = False
  236. yield from self.get_meta(
  237. table_name,
  238. constraints,
  239. column_to_field_name,
  240. is_view,
  241. is_partition,
  242. comment,
  243. )
  244. def normalize_col_name(self, col_name, used_column_names, is_relation):
  245. """
  246. Modify the column name to make it Python-compatible as a field name
  247. """
  248. field_params = {}
  249. field_notes = []
  250. new_name = col_name.lower()
  251. if new_name != col_name:
  252. field_notes.append("Field name made lowercase.")
  253. if is_relation:
  254. if new_name.endswith("_id"):
  255. new_name = new_name.removesuffix("_id")
  256. else:
  257. field_params["db_column"] = col_name
  258. new_name, num_repl = re.subn(r"\W", "_", new_name)
  259. if num_repl > 0:
  260. field_notes.append("Field renamed to remove unsuitable characters.")
  261. if new_name.find(LOOKUP_SEP) >= 0:
  262. while new_name.find(LOOKUP_SEP) >= 0:
  263. new_name = new_name.replace(LOOKUP_SEP, "_")
  264. if col_name.lower().find(LOOKUP_SEP) >= 0:
  265. # Only add the comment if the double underscore was in the original name
  266. field_notes.append(
  267. "Field renamed because it contained more than one '_' in a row."
  268. )
  269. if new_name.startswith("_"):
  270. new_name = "field%s" % new_name
  271. field_notes.append("Field renamed because it started with '_'.")
  272. if new_name.endswith("_"):
  273. new_name = "%sfield" % new_name
  274. field_notes.append("Field renamed because it ended with '_'.")
  275. if keyword.iskeyword(new_name):
  276. new_name += "_field"
  277. field_notes.append("Field renamed because it was a Python reserved word.")
  278. if new_name[0].isdigit():
  279. new_name = "number_%s" % new_name
  280. field_notes.append(
  281. "Field renamed because it wasn't a valid Python identifier."
  282. )
  283. if new_name in used_column_names:
  284. num = 0
  285. while "%s_%d" % (new_name, num) in used_column_names:
  286. num += 1
  287. new_name = "%s_%d" % (new_name, num)
  288. field_notes.append("Field renamed because of name conflict.")
  289. if col_name != new_name and field_notes:
  290. field_params["db_column"] = col_name
  291. return new_name, field_params, field_notes
  292. def get_field_type(self, connection, table_name, row):
  293. """
  294. Given the database connection, the table name, and the cursor row
  295. description, this routine will return the given field type name, as
  296. well as any additional keyword parameters and notes for the field.
  297. """
  298. field_params = {}
  299. field_notes = []
  300. try:
  301. field_type = connection.introspection.get_field_type(row.type_code, row)
  302. except KeyError:
  303. field_type = "TextField"
  304. field_notes.append("This field type is a guess.")
  305. # Add max_length for all CharFields.
  306. if field_type == "CharField" and row.display_size:
  307. if (size := int(row.display_size)) and size > 0:
  308. field_params["max_length"] = size
  309. if field_type in {"CharField", "TextField"} and row.collation:
  310. field_params["db_collation"] = row.collation
  311. if field_type == "DecimalField":
  312. if row.precision is None or row.scale is None:
  313. field_notes.append(
  314. "max_digits and decimal_places have been guessed, as this "
  315. "database handles decimal fields as float"
  316. )
  317. field_params["max_digits"] = (
  318. row.precision if row.precision is not None else 10
  319. )
  320. field_params["decimal_places"] = (
  321. row.scale if row.scale is not None else 5
  322. )
  323. else:
  324. field_params["max_digits"] = row.precision
  325. field_params["decimal_places"] = row.scale
  326. return field_type, field_params, field_notes
  327. def get_meta(
  328. self,
  329. table_name,
  330. constraints,
  331. column_to_field_name,
  332. is_view,
  333. is_partition,
  334. comment,
  335. ):
  336. """
  337. Return a sequence comprising the lines of code necessary
  338. to construct the inner Meta class for the model corresponding
  339. to the given database table name.
  340. """
  341. unique_together = []
  342. has_unsupported_constraint = False
  343. for params in constraints.values():
  344. if params["unique"]:
  345. columns = params["columns"]
  346. if None in columns:
  347. has_unsupported_constraint = True
  348. columns = [
  349. x for x in columns if x is not None and x in column_to_field_name
  350. ]
  351. if len(columns) > 1:
  352. unique_together.append(
  353. str(tuple(column_to_field_name[c] for c in columns))
  354. )
  355. if is_view:
  356. managed_comment = " # Created from a view. Don't remove."
  357. elif is_partition:
  358. managed_comment = " # Created from a partition. Don't remove."
  359. else:
  360. managed_comment = ""
  361. meta = [""]
  362. if has_unsupported_constraint:
  363. meta.append(" # A unique constraint could not be introspected.")
  364. meta += [
  365. " class Meta:",
  366. " managed = False%s" % managed_comment,
  367. " db_table = %r" % table_name,
  368. ]
  369. if unique_together:
  370. tup = "(" + ", ".join(unique_together) + ",)"
  371. meta += [" unique_together = %s" % tup]
  372. if comment:
  373. meta += [f" db_table_comment = {comment!r}"]
  374. return meta