test_extract_trunc.py 79 KB


  1. import datetime
  2. import zoneinfo
  3. from django.conf import settings
  4. from django.db import DataError, OperationalError
  5. from django.db.models import (
  6. DateField,
  7. DateTimeField,
  8. F,
  9. IntegerField,
  10. Max,
  11. OuterRef,
  12. Subquery,
  13. TimeField,
  14. )
  15. from django.db.models.functions import (
  16. Extract,
  17. ExtractDay,
  18. ExtractHour,
  19. ExtractIsoWeekDay,
  20. ExtractIsoYear,
  21. ExtractMinute,
  22. ExtractMonth,
  23. ExtractQuarter,
  24. ExtractSecond,
  25. ExtractWeek,
  26. ExtractWeekDay,
  27. ExtractYear,
  28. Trunc,
  29. TruncDate,
  30. TruncDay,
  31. TruncHour,
  32. TruncMinute,
  33. TruncMonth,
  34. TruncQuarter,
  35. TruncSecond,
  36. TruncTime,
  37. TruncWeek,
  38. TruncYear,
  39. )
  40. from django.test import (
  41. TestCase,
  42. override_settings,
  43. skipIfDBFeature,
  44. skipUnlessDBFeature,
  45. )
  46. from django.utils import timezone
  47. from ..models import Author, DTModel, Fan
  48. def truncate_to(value, kind, tzinfo=None):
  49. # Convert to target timezone before truncation
  50. if tzinfo is not None:
  51. value = value.astimezone(tzinfo)
  52. def truncate(value, kind):
  53. if kind == "second":
  54. return value.replace(microsecond=0)
  55. if kind == "minute":
  56. return value.replace(second=0, microsecond=0)
  57. if kind == "hour":
  58. return value.replace(minute=0, second=0, microsecond=0)
  59. if kind == "day":
  60. if isinstance(value, datetime.datetime):
  61. return value.replace(hour=0, minute=0, second=0, microsecond=0)
  62. return value
  63. if kind == "week":
  64. if isinstance(value, datetime.datetime):
  65. return (value - datetime.timedelta(days=value.weekday())).replace(
  66. hour=0, minute=0, second=0, microsecond=0
  67. )
  68. return value - datetime.timedelta(days=value.weekday())
  69. if kind == "month":
  70. if isinstance(value, datetime.datetime):
  71. return value.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
  72. return value.replace(day=1)
  73. if kind == "quarter":
  74. month_in_quarter = value.month - (value.month - 1) % 3
  75. if isinstance(value, datetime.datetime):
  76. return value.replace(
  77. month=month_in_quarter,
  78. day=1,
  79. hour=0,
  80. minute=0,
  81. second=0,
  82. microsecond=0,
  83. )
  84. return value.replace(month=month_in_quarter, day=1)
  85. # otherwise, truncate to year
  86. if isinstance(value, datetime.datetime):
  87. return value.replace(
  88. month=1, day=1, hour=0, minute=0, second=0, microsecond=0
  89. )
  90. return value.replace(month=1, day=1)
  91. value = truncate(value, kind)
  92. if tzinfo is not None:
  93. # If there was a daylight saving transition, then reset the timezone.
  94. value = timezone.make_aware(value.replace(tzinfo=None), tzinfo)
  95. return value
  96. @override_settings(USE_TZ=False)
  97. class DateFunctionTests(TestCase):
  98. def create_model(self, start_datetime, end_datetime):
  99. return DTModel.objects.create(
  100. name=start_datetime.isoformat() if start_datetime else "None",
  101. start_datetime=start_datetime,
  102. end_datetime=end_datetime,
  103. start_date=start_datetime.date() if start_datetime else None,
  104. end_date=end_datetime.date() if end_datetime else None,
  105. start_time=start_datetime.time() if start_datetime else None,
  106. end_time=end_datetime.time() if end_datetime else None,
  107. duration=(
  108. (end_datetime - start_datetime)
  109. if start_datetime and end_datetime
  110. else None
  111. ),
  112. )
  113. def test_extract_year_exact_lookup(self):
  114. """
  115. Extract year uses a BETWEEN filter to compare the year to allow indexes
  116. to be used.
  117. """
  118. start_datetime = datetime.datetime(2015, 6, 15, 14, 10)
  119. end_datetime = datetime.datetime(2016, 6, 15, 14, 10)
  120. if settings.USE_TZ:
  121. start_datetime = timezone.make_aware(start_datetime)
  122. end_datetime = timezone.make_aware(end_datetime)
  123. self.create_model(start_datetime, end_datetime)
  124. self.create_model(end_datetime, start_datetime)
  125. for lookup in ("year", "iso_year"):
  126. with self.subTest(lookup):
  127. qs = DTModel.objects.filter(
  128. **{"start_datetime__%s__exact" % lookup: 2015}
  129. )
  130. self.assertEqual(qs.count(), 1)
  131. query_string = str(qs.query).lower()
  132. self.assertEqual(query_string.count(" between "), 1)
  133. self.assertEqual(query_string.count("extract"), 0)
  134. # exact is implied and should be the same
  135. qs = DTModel.objects.filter(**{"start_datetime__%s" % lookup: 2015})
  136. self.assertEqual(qs.count(), 1)
  137. query_string = str(qs.query).lower()
  138. self.assertEqual(query_string.count(" between "), 1)
  139. self.assertEqual(query_string.count("extract"), 0)
  140. # date and datetime fields should behave the same
  141. qs = DTModel.objects.filter(**{"start_date__%s" % lookup: 2015})
  142. self.assertEqual(qs.count(), 1)
  143. query_string = str(qs.query).lower()
  144. self.assertEqual(query_string.count(" between "), 1)
  145. self.assertEqual(query_string.count("extract"), 0)
  146. # an expression rhs cannot use the between optimization.
  147. qs = DTModel.objects.annotate(
  148. start_year=ExtractYear("start_datetime"),
  149. ).filter(end_datetime__year=F("start_year") + 1)
  150. self.assertEqual(qs.count(), 1)
  151. query_string = str(qs.query).lower()
  152. self.assertEqual(query_string.count(" between "), 0)
  153. self.assertEqual(query_string.count("extract"), 3)
  154. def test_extract_year_greaterthan_lookup(self):
  155. start_datetime = datetime.datetime(2015, 6, 15, 14, 10)
  156. end_datetime = datetime.datetime(2016, 6, 15, 14, 10)
  157. if settings.USE_TZ:
  158. start_datetime = timezone.make_aware(start_datetime)
  159. end_datetime = timezone.make_aware(end_datetime)
  160. self.create_model(start_datetime, end_datetime)
  161. self.create_model(end_datetime, start_datetime)
  162. for lookup in ("year", "iso_year"):
  163. with self.subTest(lookup):
  164. qs = DTModel.objects.filter(**{"start_datetime__%s__gt" % lookup: 2015})
  165. self.assertEqual(qs.count(), 1)
  166. self.assertEqual(str(qs.query).lower().count("extract"), 0)
  167. qs = DTModel.objects.filter(
  168. **{"start_datetime__%s__gte" % lookup: 2015}
  169. )
  170. self.assertEqual(qs.count(), 2)
  171. self.assertEqual(str(qs.query).lower().count("extract"), 0)
  172. qs = DTModel.objects.annotate(
  173. start_year=ExtractYear("start_datetime"),
  174. ).filter(**{"end_datetime__%s__gte" % lookup: F("start_year")})
  175. self.assertEqual(qs.count(), 1)
  176. self.assertGreaterEqual(str(qs.query).lower().count("extract"), 2)
  177. def test_extract_year_lessthan_lookup(self):
  178. start_datetime = datetime.datetime(2015, 6, 15, 14, 10)
  179. end_datetime = datetime.datetime(2016, 6, 15, 14, 10)
  180. if settings.USE_TZ:
  181. start_datetime = timezone.make_aware(start_datetime)
  182. end_datetime = timezone.make_aware(end_datetime)
  183. self.create_model(start_datetime, end_datetime)
  184. self.create_model(end_datetime, start_datetime)
  185. for lookup in ("year", "iso_year"):
  186. with self.subTest(lookup):
  187. qs = DTModel.objects.filter(**{"start_datetime__%s__lt" % lookup: 2016})
  188. self.assertEqual(qs.count(), 1)
  189. self.assertEqual(str(qs.query).count("extract"), 0)
  190. qs = DTModel.objects.filter(
  191. **{"start_datetime__%s__lte" % lookup: 2016}
  192. )
  193. self.assertEqual(qs.count(), 2)
  194. self.assertEqual(str(qs.query).count("extract"), 0)
  195. qs = DTModel.objects.annotate(
  196. end_year=ExtractYear("end_datetime"),
  197. ).filter(**{"start_datetime__%s__lte" % lookup: F("end_year")})
  198. self.assertEqual(qs.count(), 1)
  199. self.assertGreaterEqual(str(qs.query).lower().count("extract"), 2)
  200. def test_extract_lookup_name_sql_injection(self):
  201. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  202. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  203. if settings.USE_TZ:
  204. start_datetime = timezone.make_aware(start_datetime)
  205. end_datetime = timezone.make_aware(end_datetime)
  206. self.create_model(start_datetime, end_datetime)
  207. self.create_model(end_datetime, start_datetime)
  208. with self.assertRaises((OperationalError, ValueError)):
  209. DTModel.objects.filter(
  210. start_datetime__year=Extract(
  211. "start_datetime", "day' FROM start_datetime)) OR 1=1;--"
  212. )
  213. ).exists()
  214. def test_extract_func(self):
  215. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  216. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  217. if settings.USE_TZ:
  218. start_datetime = timezone.make_aware(start_datetime)
  219. end_datetime = timezone.make_aware(end_datetime)
  220. self.create_model(start_datetime, end_datetime)
  221. self.create_model(end_datetime, start_datetime)
  222. with self.assertRaisesMessage(ValueError, "lookup_name must be provided"):
  223. Extract("start_datetime")
  224. msg = (
  225. "Extract input expression must be DateField, DateTimeField, TimeField, or "
  226. "DurationField."
  227. )
  228. with self.assertRaisesMessage(ValueError, msg):
  229. list(DTModel.objects.annotate(extracted=Extract("name", "hour")))
  230. with self.assertRaisesMessage(
  231. ValueError,
  232. "Cannot extract time component 'second' from DateField 'start_date'.",
  233. ):
  234. list(DTModel.objects.annotate(extracted=Extract("start_date", "second")))
  235. self.assertQuerySetEqual(
  236. DTModel.objects.annotate(
  237. extracted=Extract("start_datetime", "year")
  238. ).order_by("start_datetime"),
  239. [(start_datetime, start_datetime.year), (end_datetime, end_datetime.year)],
  240. lambda m: (m.start_datetime, m.extracted),
  241. )
  242. self.assertQuerySetEqual(
  243. DTModel.objects.annotate(
  244. extracted=Extract("start_datetime", "quarter")
  245. ).order_by("start_datetime"),
  246. [(start_datetime, 2), (end_datetime, 2)],
  247. lambda m: (m.start_datetime, m.extracted),
  248. )
  249. self.assertQuerySetEqual(
  250. DTModel.objects.annotate(
  251. extracted=Extract("start_datetime", "month")
  252. ).order_by("start_datetime"),
  253. [
  254. (start_datetime, start_datetime.month),
  255. (end_datetime, end_datetime.month),
  256. ],
  257. lambda m: (m.start_datetime, m.extracted),
  258. )
  259. self.assertQuerySetEqual(
  260. DTModel.objects.annotate(
  261. extracted=Extract("start_datetime", "day")
  262. ).order_by("start_datetime"),
  263. [(start_datetime, start_datetime.day), (end_datetime, end_datetime.day)],
  264. lambda m: (m.start_datetime, m.extracted),
  265. )
  266. self.assertQuerySetEqual(
  267. DTModel.objects.annotate(
  268. extracted=Extract("start_datetime", "week")
  269. ).order_by("start_datetime"),
  270. [(start_datetime, 25), (end_datetime, 24)],
  271. lambda m: (m.start_datetime, m.extracted),
  272. )
  273. self.assertQuerySetEqual(
  274. DTModel.objects.annotate(
  275. extracted=Extract("start_datetime", "week_day")
  276. ).order_by("start_datetime"),
  277. [
  278. (start_datetime, (start_datetime.isoweekday() % 7) + 1),
  279. (end_datetime, (end_datetime.isoweekday() % 7) + 1),
  280. ],
  281. lambda m: (m.start_datetime, m.extracted),
  282. )
  283. self.assertQuerySetEqual(
  284. DTModel.objects.annotate(
  285. extracted=Extract("start_datetime", "iso_week_day"),
  286. ).order_by("start_datetime"),
  287. [
  288. (start_datetime, start_datetime.isoweekday()),
  289. (end_datetime, end_datetime.isoweekday()),
  290. ],
  291. lambda m: (m.start_datetime, m.extracted),
  292. )
  293. self.assertQuerySetEqual(
  294. DTModel.objects.annotate(
  295. extracted=Extract("start_datetime", "hour")
  296. ).order_by("start_datetime"),
  297. [(start_datetime, start_datetime.hour), (end_datetime, end_datetime.hour)],
  298. lambda m: (m.start_datetime, m.extracted),
  299. )
  300. self.assertQuerySetEqual(
  301. DTModel.objects.annotate(
  302. extracted=Extract("start_datetime", "minute")
  303. ).order_by("start_datetime"),
  304. [
  305. (start_datetime, start_datetime.minute),
  306. (end_datetime, end_datetime.minute),
  307. ],
  308. lambda m: (m.start_datetime, m.extracted),
  309. )
  310. self.assertQuerySetEqual(
  311. DTModel.objects.annotate(
  312. extracted=Extract("start_datetime", "second")
  313. ).order_by("start_datetime"),
  314. [
  315. (start_datetime, start_datetime.second),
  316. (end_datetime, end_datetime.second),
  317. ],
  318. lambda m: (m.start_datetime, m.extracted),
  319. )
  320. self.assertEqual(
  321. DTModel.objects.filter(
  322. start_datetime__year=Extract("start_datetime", "year")
  323. ).count(),
  324. 2,
  325. )
  326. self.assertEqual(
  327. DTModel.objects.filter(
  328. start_datetime__hour=Extract("start_datetime", "hour")
  329. ).count(),
  330. 2,
  331. )
  332. self.assertEqual(
  333. DTModel.objects.filter(
  334. start_date__month=Extract("start_date", "month")
  335. ).count(),
  336. 2,
  337. )
  338. self.assertEqual(
  339. DTModel.objects.filter(
  340. start_time__hour=Extract("start_time", "hour")
  341. ).count(),
  342. 2,
  343. )
  344. def test_extract_none(self):
  345. self.create_model(None, None)
  346. for t in (
  347. Extract("start_datetime", "year"),
  348. Extract("start_date", "year"),
  349. Extract("start_time", "hour"),
  350. ):
  351. with self.subTest(t):
  352. self.assertIsNone(
  353. DTModel.objects.annotate(extracted=t).first().extracted
  354. )
  355. def test_extract_outerref_validation(self):
  356. inner_qs = DTModel.objects.filter(name=ExtractMonth(OuterRef("name")))
  357. msg = (
  358. "Extract input expression must be DateField, DateTimeField, "
  359. "TimeField, or DurationField."
  360. )
  361. with self.assertRaisesMessage(ValueError, msg):
  362. DTModel.objects.annotate(related_name=Subquery(inner_qs.values("name")[:1]))
  363. @skipUnlessDBFeature("has_native_duration_field")
  364. def test_extract_duration(self):
  365. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  366. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  367. if settings.USE_TZ:
  368. start_datetime = timezone.make_aware(start_datetime)
  369. end_datetime = timezone.make_aware(end_datetime)
  370. self.create_model(start_datetime, end_datetime)
  371. self.create_model(end_datetime, start_datetime)
  372. self.assertQuerySetEqual(
  373. DTModel.objects.annotate(extracted=Extract("duration", "second")).order_by(
  374. "start_datetime"
  375. ),
  376. [
  377. (start_datetime, (end_datetime - start_datetime).seconds % 60),
  378. (end_datetime, (start_datetime - end_datetime).seconds % 60),
  379. ],
  380. lambda m: (m.start_datetime, m.extracted),
  381. )
  382. self.assertEqual(
  383. DTModel.objects.annotate(
  384. duration_days=Extract("duration", "day"),
  385. )
  386. .filter(duration_days__gt=200)
  387. .count(),
  388. 1,
  389. )
  390. @skipIfDBFeature("has_native_duration_field")
  391. def test_extract_duration_without_native_duration_field(self):
  392. msg = "Extract requires native DurationField database support."
  393. with self.assertRaisesMessage(ValueError, msg):
  394. list(DTModel.objects.annotate(extracted=Extract("duration", "second")))
  395. def test_extract_duration_unsupported_lookups(self):
  396. msg = "Cannot extract component '%s' from DurationField 'duration'."
  397. for lookup in (
  398. "year",
  399. "iso_year",
  400. "month",
  401. "week",
  402. "week_day",
  403. "iso_week_day",
  404. "quarter",
  405. ):
  406. with self.subTest(lookup):
  407. with self.assertRaisesMessage(ValueError, msg % lookup):
  408. DTModel.objects.annotate(extracted=Extract("duration", lookup))
  409. def test_extract_year_func(self):
  410. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  411. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  412. if settings.USE_TZ:
  413. start_datetime = timezone.make_aware(start_datetime)
  414. end_datetime = timezone.make_aware(end_datetime)
  415. self.create_model(start_datetime, end_datetime)
  416. self.create_model(end_datetime, start_datetime)
  417. self.assertQuerySetEqual(
  418. DTModel.objects.annotate(extracted=ExtractYear("start_datetime")).order_by(
  419. "start_datetime"
  420. ),
  421. [(start_datetime, start_datetime.year), (end_datetime, end_datetime.year)],
  422. lambda m: (m.start_datetime, m.extracted),
  423. )
  424. self.assertQuerySetEqual(
  425. DTModel.objects.annotate(extracted=ExtractYear("start_date")).order_by(
  426. "start_datetime"
  427. ),
  428. [(start_datetime, start_datetime.year), (end_datetime, end_datetime.year)],
  429. lambda m: (m.start_datetime, m.extracted),
  430. )
  431. self.assertEqual(
  432. DTModel.objects.filter(
  433. start_datetime__year=ExtractYear("start_datetime")
  434. ).count(),
  435. 2,
  436. )
  437. def test_extract_iso_year_func(self):
  438. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  439. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  440. if settings.USE_TZ:
  441. start_datetime = timezone.make_aware(start_datetime)
  442. end_datetime = timezone.make_aware(end_datetime)
  443. self.create_model(start_datetime, end_datetime)
  444. self.create_model(end_datetime, start_datetime)
  445. self.assertQuerySetEqual(
  446. DTModel.objects.annotate(
  447. extracted=ExtractIsoYear("start_datetime")
  448. ).order_by("start_datetime"),
  449. [(start_datetime, start_datetime.year), (end_datetime, end_datetime.year)],
  450. lambda m: (m.start_datetime, m.extracted),
  451. )
  452. self.assertQuerySetEqual(
  453. DTModel.objects.annotate(extracted=ExtractIsoYear("start_date")).order_by(
  454. "start_datetime"
  455. ),
  456. [(start_datetime, start_datetime.year), (end_datetime, end_datetime.year)],
  457. lambda m: (m.start_datetime, m.extracted),
  458. )
  459. # Both dates are from the same week year.
  460. self.assertEqual(
  461. DTModel.objects.filter(
  462. start_datetime__iso_year=ExtractIsoYear("start_datetime")
  463. ).count(),
  464. 2,
  465. )
  466. def test_extract_iso_year_func_boundaries(self):
  467. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  468. if settings.USE_TZ:
  469. end_datetime = timezone.make_aware(end_datetime)
  470. week_52_day_2014 = datetime.datetime(2014, 12, 27, 13, 0) # Sunday
  471. week_1_day_2014_2015 = datetime.datetime(2014, 12, 31, 13, 0) # Wednesday
  472. week_53_day_2015 = datetime.datetime(2015, 12, 31, 13, 0) # Thursday
  473. if settings.USE_TZ:
  474. week_1_day_2014_2015 = timezone.make_aware(week_1_day_2014_2015)
  475. week_52_day_2014 = timezone.make_aware(week_52_day_2014)
  476. week_53_day_2015 = timezone.make_aware(week_53_day_2015)
  477. days = [week_52_day_2014, week_1_day_2014_2015, week_53_day_2015]
  478. obj_1_iso_2014 = self.create_model(week_52_day_2014, end_datetime)
  479. obj_1_iso_2015 = self.create_model(week_1_day_2014_2015, end_datetime)
  480. obj_2_iso_2015 = self.create_model(week_53_day_2015, end_datetime)
  481. qs = (
  482. DTModel.objects.filter(start_datetime__in=days)
  483. .annotate(
  484. extracted=ExtractIsoYear("start_datetime"),
  485. )
  486. .order_by("start_datetime")
  487. )
  488. self.assertQuerySetEqual(
  489. qs,
  490. [
  491. (week_52_day_2014, 2014),
  492. (week_1_day_2014_2015, 2015),
  493. (week_53_day_2015, 2015),
  494. ],
  495. lambda m: (m.start_datetime, m.extracted),
  496. )
  497. qs = DTModel.objects.filter(
  498. start_datetime__iso_year=2015,
  499. ).order_by("start_datetime")
  500. self.assertSequenceEqual(qs, [obj_1_iso_2015, obj_2_iso_2015])
  501. qs = DTModel.objects.filter(
  502. start_datetime__iso_year__gt=2014,
  503. ).order_by("start_datetime")
  504. self.assertSequenceEqual(qs, [obj_1_iso_2015, obj_2_iso_2015])
  505. qs = DTModel.objects.filter(
  506. start_datetime__iso_year__lte=2014,
  507. ).order_by("start_datetime")
  508. self.assertSequenceEqual(qs, [obj_1_iso_2014])
  509. def test_extract_month_func(self):
  510. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  511. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  512. if settings.USE_TZ:
  513. start_datetime = timezone.make_aware(start_datetime)
  514. end_datetime = timezone.make_aware(end_datetime)
  515. self.create_model(start_datetime, end_datetime)
  516. self.create_model(end_datetime, start_datetime)
  517. self.assertQuerySetEqual(
  518. DTModel.objects.annotate(extracted=ExtractMonth("start_datetime")).order_by(
  519. "start_datetime"
  520. ),
  521. [
  522. (start_datetime, start_datetime.month),
  523. (end_datetime, end_datetime.month),
  524. ],
  525. lambda m: (m.start_datetime, m.extracted),
  526. )
  527. self.assertQuerySetEqual(
  528. DTModel.objects.annotate(extracted=ExtractMonth("start_date")).order_by(
  529. "start_datetime"
  530. ),
  531. [
  532. (start_datetime, start_datetime.month),
  533. (end_datetime, end_datetime.month),
  534. ],
  535. lambda m: (m.start_datetime, m.extracted),
  536. )
  537. self.assertEqual(
  538. DTModel.objects.filter(
  539. start_datetime__month=ExtractMonth("start_datetime")
  540. ).count(),
  541. 2,
  542. )
  543. def test_extract_day_func(self):
  544. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  545. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  546. if settings.USE_TZ:
  547. start_datetime = timezone.make_aware(start_datetime)
  548. end_datetime = timezone.make_aware(end_datetime)
  549. self.create_model(start_datetime, end_datetime)
  550. self.create_model(end_datetime, start_datetime)
  551. self.assertQuerySetEqual(
  552. DTModel.objects.annotate(extracted=ExtractDay("start_datetime")).order_by(
  553. "start_datetime"
  554. ),
  555. [(start_datetime, start_datetime.day), (end_datetime, end_datetime.day)],
  556. lambda m: (m.start_datetime, m.extracted),
  557. )
  558. self.assertQuerySetEqual(
  559. DTModel.objects.annotate(extracted=ExtractDay("start_date")).order_by(
  560. "start_datetime"
  561. ),
  562. [(start_datetime, start_datetime.day), (end_datetime, end_datetime.day)],
  563. lambda m: (m.start_datetime, m.extracted),
  564. )
  565. self.assertEqual(
  566. DTModel.objects.filter(
  567. start_datetime__day=ExtractDay("start_datetime")
  568. ).count(),
  569. 2,
  570. )
  571. def test_extract_week_func(self):
  572. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  573. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  574. if settings.USE_TZ:
  575. start_datetime = timezone.make_aware(start_datetime)
  576. end_datetime = timezone.make_aware(end_datetime)
  577. self.create_model(start_datetime, end_datetime)
  578. self.create_model(end_datetime, start_datetime)
  579. self.assertQuerySetEqual(
  580. DTModel.objects.annotate(extracted=ExtractWeek("start_datetime")).order_by(
  581. "start_datetime"
  582. ),
  583. [(start_datetime, 25), (end_datetime, 24)],
  584. lambda m: (m.start_datetime, m.extracted),
  585. )
  586. self.assertQuerySetEqual(
  587. DTModel.objects.annotate(extracted=ExtractWeek("start_date")).order_by(
  588. "start_datetime"
  589. ),
  590. [(start_datetime, 25), (end_datetime, 24)],
  591. lambda m: (m.start_datetime, m.extracted),
  592. )
  593. # both dates are from the same week.
  594. self.assertEqual(
  595. DTModel.objects.filter(
  596. start_datetime__week=ExtractWeek("start_datetime")
  597. ).count(),
  598. 2,
  599. )
  600. def test_extract_quarter_func(self):
  601. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  602. end_datetime = datetime.datetime(2016, 8, 15, 14, 10, 50, 123)
  603. if settings.USE_TZ:
  604. start_datetime = timezone.make_aware(start_datetime)
  605. end_datetime = timezone.make_aware(end_datetime)
  606. self.create_model(start_datetime, end_datetime)
  607. self.create_model(end_datetime, start_datetime)
  608. self.assertQuerySetEqual(
  609. DTModel.objects.annotate(
  610. extracted=ExtractQuarter("start_datetime")
  611. ).order_by("start_datetime"),
  612. [(start_datetime, 2), (end_datetime, 3)],
  613. lambda m: (m.start_datetime, m.extracted),
  614. )
  615. self.assertQuerySetEqual(
  616. DTModel.objects.annotate(extracted=ExtractQuarter("start_date")).order_by(
  617. "start_datetime"
  618. ),
  619. [(start_datetime, 2), (end_datetime, 3)],
  620. lambda m: (m.start_datetime, m.extracted),
  621. )
  622. self.assertEqual(
  623. DTModel.objects.filter(
  624. start_datetime__quarter=ExtractQuarter("start_datetime")
  625. ).count(),
  626. 2,
  627. )
  628. def test_extract_quarter_func_boundaries(self):
  629. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  630. if settings.USE_TZ:
  631. end_datetime = timezone.make_aware(end_datetime)
  632. last_quarter_2014 = datetime.datetime(2014, 12, 31, 13, 0)
  633. first_quarter_2015 = datetime.datetime(2015, 1, 1, 13, 0)
  634. if settings.USE_TZ:
  635. last_quarter_2014 = timezone.make_aware(last_quarter_2014)
  636. first_quarter_2015 = timezone.make_aware(first_quarter_2015)
  637. dates = [last_quarter_2014, first_quarter_2015]
  638. self.create_model(last_quarter_2014, end_datetime)
  639. self.create_model(first_quarter_2015, end_datetime)
  640. qs = (
  641. DTModel.objects.filter(start_datetime__in=dates)
  642. .annotate(
  643. extracted=ExtractQuarter("start_datetime"),
  644. )
  645. .order_by("start_datetime")
  646. )
  647. self.assertQuerySetEqual(
  648. qs,
  649. [
  650. (last_quarter_2014, 4),
  651. (first_quarter_2015, 1),
  652. ],
  653. lambda m: (m.start_datetime, m.extracted),
  654. )
  655. def test_extract_week_func_boundaries(self):
  656. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  657. if settings.USE_TZ:
  658. end_datetime = timezone.make_aware(end_datetime)
  659. week_52_day_2014 = datetime.datetime(2014, 12, 27, 13, 0) # Sunday
  660. week_1_day_2014_2015 = datetime.datetime(2014, 12, 31, 13, 0) # Wednesday
  661. week_53_day_2015 = datetime.datetime(2015, 12, 31, 13, 0) # Thursday
  662. if settings.USE_TZ:
  663. week_1_day_2014_2015 = timezone.make_aware(week_1_day_2014_2015)
  664. week_52_day_2014 = timezone.make_aware(week_52_day_2014)
  665. week_53_day_2015 = timezone.make_aware(week_53_day_2015)
  666. days = [week_52_day_2014, week_1_day_2014_2015, week_53_day_2015]
  667. self.create_model(week_53_day_2015, end_datetime)
  668. self.create_model(week_52_day_2014, end_datetime)
  669. self.create_model(week_1_day_2014_2015, end_datetime)
  670. qs = (
  671. DTModel.objects.filter(start_datetime__in=days)
  672. .annotate(
  673. extracted=ExtractWeek("start_datetime"),
  674. )
  675. .order_by("start_datetime")
  676. )
  677. self.assertQuerySetEqual(
  678. qs,
  679. [
  680. (week_52_day_2014, 52),
  681. (week_1_day_2014_2015, 1),
  682. (week_53_day_2015, 53),
  683. ],
  684. lambda m: (m.start_datetime, m.extracted),
  685. )
  686. def test_extract_weekday_func(self):
  687. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  688. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  689. if settings.USE_TZ:
  690. start_datetime = timezone.make_aware(start_datetime)
  691. end_datetime = timezone.make_aware(end_datetime)
  692. self.create_model(start_datetime, end_datetime)
  693. self.create_model(end_datetime, start_datetime)
  694. self.assertQuerySetEqual(
  695. DTModel.objects.annotate(
  696. extracted=ExtractWeekDay("start_datetime")
  697. ).order_by("start_datetime"),
  698. [
  699. (start_datetime, (start_datetime.isoweekday() % 7) + 1),
  700. (end_datetime, (end_datetime.isoweekday() % 7) + 1),
  701. ],
  702. lambda m: (m.start_datetime, m.extracted),
  703. )
  704. self.assertQuerySetEqual(
  705. DTModel.objects.annotate(extracted=ExtractWeekDay("start_date")).order_by(
  706. "start_datetime"
  707. ),
  708. [
  709. (start_datetime, (start_datetime.isoweekday() % 7) + 1),
  710. (end_datetime, (end_datetime.isoweekday() % 7) + 1),
  711. ],
  712. lambda m: (m.start_datetime, m.extracted),
  713. )
  714. self.assertEqual(
  715. DTModel.objects.filter(
  716. start_datetime__week_day=ExtractWeekDay("start_datetime")
  717. ).count(),
  718. 2,
  719. )
  720. def test_extract_iso_weekday_func(self):
  721. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  722. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  723. if settings.USE_TZ:
  724. start_datetime = timezone.make_aware(start_datetime)
  725. end_datetime = timezone.make_aware(end_datetime)
  726. self.create_model(start_datetime, end_datetime)
  727. self.create_model(end_datetime, start_datetime)
  728. self.assertQuerySetEqual(
  729. DTModel.objects.annotate(
  730. extracted=ExtractIsoWeekDay("start_datetime"),
  731. ).order_by("start_datetime"),
  732. [
  733. (start_datetime, start_datetime.isoweekday()),
  734. (end_datetime, end_datetime.isoweekday()),
  735. ],
  736. lambda m: (m.start_datetime, m.extracted),
  737. )
  738. self.assertQuerySetEqual(
  739. DTModel.objects.annotate(
  740. extracted=ExtractIsoWeekDay("start_date"),
  741. ).order_by("start_datetime"),
  742. [
  743. (start_datetime, start_datetime.isoweekday()),
  744. (end_datetime, end_datetime.isoweekday()),
  745. ],
  746. lambda m: (m.start_datetime, m.extracted),
  747. )
  748. self.assertEqual(
  749. DTModel.objects.filter(
  750. start_datetime__week_day=ExtractWeekDay("start_datetime"),
  751. ).count(),
  752. 2,
  753. )
  754. def test_extract_hour_func(self):
  755. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  756. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  757. if settings.USE_TZ:
  758. start_datetime = timezone.make_aware(start_datetime)
  759. end_datetime = timezone.make_aware(end_datetime)
  760. self.create_model(start_datetime, end_datetime)
  761. self.create_model(end_datetime, start_datetime)
  762. self.assertQuerySetEqual(
  763. DTModel.objects.annotate(extracted=ExtractHour("start_datetime")).order_by(
  764. "start_datetime"
  765. ),
  766. [(start_datetime, start_datetime.hour), (end_datetime, end_datetime.hour)],
  767. lambda m: (m.start_datetime, m.extracted),
  768. )
  769. self.assertQuerySetEqual(
  770. DTModel.objects.annotate(extracted=ExtractHour("start_time")).order_by(
  771. "start_datetime"
  772. ),
  773. [(start_datetime, start_datetime.hour), (end_datetime, end_datetime.hour)],
  774. lambda m: (m.start_datetime, m.extracted),
  775. )
  776. self.assertEqual(
  777. DTModel.objects.filter(
  778. start_datetime__hour=ExtractHour("start_datetime")
  779. ).count(),
  780. 2,
  781. )
  782. def test_extract_minute_func(self):
  783. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  784. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  785. if settings.USE_TZ:
  786. start_datetime = timezone.make_aware(start_datetime)
  787. end_datetime = timezone.make_aware(end_datetime)
  788. self.create_model(start_datetime, end_datetime)
  789. self.create_model(end_datetime, start_datetime)
  790. self.assertQuerySetEqual(
  791. DTModel.objects.annotate(
  792. extracted=ExtractMinute("start_datetime")
  793. ).order_by("start_datetime"),
  794. [
  795. (start_datetime, start_datetime.minute),
  796. (end_datetime, end_datetime.minute),
  797. ],
  798. lambda m: (m.start_datetime, m.extracted),
  799. )
  800. self.assertQuerySetEqual(
  801. DTModel.objects.annotate(extracted=ExtractMinute("start_time")).order_by(
  802. "start_datetime"
  803. ),
  804. [
  805. (start_datetime, start_datetime.minute),
  806. (end_datetime, end_datetime.minute),
  807. ],
  808. lambda m: (m.start_datetime, m.extracted),
  809. )
  810. self.assertEqual(
  811. DTModel.objects.filter(
  812. start_datetime__minute=ExtractMinute("start_datetime")
  813. ).count(),
  814. 2,
  815. )
  816. def test_extract_second_func(self):
  817. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  818. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  819. if settings.USE_TZ:
  820. start_datetime = timezone.make_aware(start_datetime)
  821. end_datetime = timezone.make_aware(end_datetime)
  822. self.create_model(start_datetime, end_datetime)
  823. self.create_model(end_datetime, start_datetime)
  824. self.assertQuerySetEqual(
  825. DTModel.objects.annotate(
  826. extracted=ExtractSecond("start_datetime")
  827. ).order_by("start_datetime"),
  828. [
  829. (start_datetime, start_datetime.second),
  830. (end_datetime, end_datetime.second),
  831. ],
  832. lambda m: (m.start_datetime, m.extracted),
  833. )
  834. self.assertQuerySetEqual(
  835. DTModel.objects.annotate(extracted=ExtractSecond("start_time")).order_by(
  836. "start_datetime"
  837. ),
  838. [
  839. (start_datetime, start_datetime.second),
  840. (end_datetime, end_datetime.second),
  841. ],
  842. lambda m: (m.start_datetime, m.extracted),
  843. )
  844. self.assertEqual(
  845. DTModel.objects.filter(
  846. start_datetime__second=ExtractSecond("start_datetime")
  847. ).count(),
  848. 2,
  849. )
  850. def test_extract_second_func_no_fractional(self):
  851. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  852. end_datetime = datetime.datetime(2016, 6, 15, 14, 30, 50, 783)
  853. if settings.USE_TZ:
  854. start_datetime = timezone.make_aware(start_datetime)
  855. end_datetime = timezone.make_aware(end_datetime)
  856. obj = self.create_model(start_datetime, end_datetime)
  857. self.assertSequenceEqual(
  858. DTModel.objects.filter(start_datetime__second=F("end_datetime__second")),
  859. [obj],
  860. )
  861. self.assertSequenceEqual(
  862. DTModel.objects.filter(start_time__second=F("end_time__second")),
  863. [obj],
  864. )
  865. def test_trunc_lookup_name_sql_injection(self):
  866. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  867. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  868. if settings.USE_TZ:
  869. start_datetime = timezone.make_aware(start_datetime)
  870. end_datetime = timezone.make_aware(end_datetime)
  871. self.create_model(start_datetime, end_datetime)
  872. self.create_model(end_datetime, start_datetime)
  873. # Database backends raise an exception or don't return any results.
  874. try:
  875. exists = DTModel.objects.filter(
  876. start_datetime__date=Trunc(
  877. "start_datetime",
  878. "year', start_datetime)) OR 1=1;--",
  879. )
  880. ).exists()
  881. except (DataError, OperationalError):
  882. pass
  883. else:
  884. self.assertIs(exists, False)
  885. def test_trunc_func(self):
  886. start_datetime = datetime.datetime(999, 6, 15, 14, 30, 50, 321)
  887. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  888. if settings.USE_TZ:
  889. start_datetime = timezone.make_aware(start_datetime)
  890. end_datetime = timezone.make_aware(end_datetime)
  891. self.create_model(start_datetime, end_datetime)
  892. self.create_model(end_datetime, start_datetime)
  893. def assertDatetimeKind(kind):
  894. truncated_start = truncate_to(start_datetime, kind)
  895. truncated_end = truncate_to(end_datetime, kind)
  896. queryset = DTModel.objects.annotate(
  897. truncated=Trunc("start_datetime", kind, output_field=DateTimeField())
  898. ).order_by("start_datetime")
  899. self.assertSequenceEqual(
  900. queryset.values_list("start_datetime", "truncated"),
  901. [
  902. (start_datetime, truncated_start),
  903. (end_datetime, truncated_end),
  904. ],
  905. )
  906. def assertDateKind(kind):
  907. truncated_start = truncate_to(start_datetime.date(), kind)
  908. truncated_end = truncate_to(end_datetime.date(), kind)
  909. queryset = DTModel.objects.annotate(
  910. truncated=Trunc("start_date", kind, output_field=DateField())
  911. ).order_by("start_datetime")
  912. self.assertSequenceEqual(
  913. queryset.values_list("start_datetime", "truncated"),
  914. [
  915. (start_datetime, truncated_start),
  916. (end_datetime, truncated_end),
  917. ],
  918. )
  919. def assertTimeKind(kind):
  920. truncated_start = truncate_to(start_datetime.time(), kind)
  921. truncated_end = truncate_to(end_datetime.time(), kind)
  922. queryset = DTModel.objects.annotate(
  923. truncated=Trunc("start_time", kind, output_field=TimeField())
  924. ).order_by("start_datetime")
  925. self.assertSequenceEqual(
  926. queryset.values_list("start_datetime", "truncated"),
  927. [
  928. (start_datetime, truncated_start),
  929. (end_datetime, truncated_end),
  930. ],
  931. )
  932. def assertDatetimeToTimeKind(kind):
  933. truncated_start = truncate_to(start_datetime.time(), kind)
  934. truncated_end = truncate_to(end_datetime.time(), kind)
  935. queryset = DTModel.objects.annotate(
  936. truncated=Trunc("start_datetime", kind, output_field=TimeField()),
  937. ).order_by("start_datetime")
  938. self.assertSequenceEqual(
  939. queryset.values_list("start_datetime", "truncated"),
  940. [
  941. (start_datetime, truncated_start),
  942. (end_datetime, truncated_end),
  943. ],
  944. )
  945. date_truncations = ["year", "quarter", "month", "day"]
  946. time_truncations = ["hour", "minute", "second"]
  947. tests = [
  948. (assertDateKind, date_truncations),
  949. (assertTimeKind, time_truncations),
  950. (assertDatetimeKind, [*date_truncations, *time_truncations]),
  951. (assertDatetimeToTimeKind, time_truncations),
  952. ]
  953. for assertion, truncations in tests:
  954. for truncation in truncations:
  955. with self.subTest(assertion=assertion.__name__, truncation=truncation):
  956. assertion(truncation)
  957. qs = DTModel.objects.filter(
  958. start_datetime__date=Trunc(
  959. "start_datetime", "day", output_field=DateField()
  960. )
  961. )
  962. self.assertEqual(qs.count(), 2)
  963. def _test_trunc_week(self, start_datetime, end_datetime):
  964. if settings.USE_TZ:
  965. start_datetime = timezone.make_aware(start_datetime)
  966. end_datetime = timezone.make_aware(end_datetime)
  967. self.create_model(start_datetime, end_datetime)
  968. self.create_model(end_datetime, start_datetime)
  969. self.assertQuerySetEqual(
  970. DTModel.objects.annotate(
  971. truncated=Trunc("start_datetime", "week", output_field=DateTimeField())
  972. ).order_by("start_datetime"),
  973. [
  974. (start_datetime, truncate_to(start_datetime, "week")),
  975. (end_datetime, truncate_to(end_datetime, "week")),
  976. ],
  977. lambda m: (m.start_datetime, m.truncated),
  978. )
  979. self.assertQuerySetEqual(
  980. DTModel.objects.annotate(
  981. truncated=Trunc("start_date", "week", output_field=DateField())
  982. ).order_by("start_datetime"),
  983. [
  984. (start_datetime, truncate_to(start_datetime.date(), "week")),
  985. (end_datetime, truncate_to(end_datetime.date(), "week")),
  986. ],
  987. lambda m: (m.start_datetime, m.truncated),
  988. )
  989. def test_trunc_week(self):
  990. self._test_trunc_week(
  991. start_datetime=datetime.datetime(2015, 6, 15, 14, 30, 50, 321),
  992. end_datetime=datetime.datetime(2016, 6, 15, 14, 10, 50, 123),
  993. )
  994. def test_trunc_week_before_1000(self):
  995. self._test_trunc_week(
  996. start_datetime=datetime.datetime(999, 6, 15, 14, 30, 50, 321),
  997. end_datetime=datetime.datetime(2016, 6, 15, 14, 10, 50, 123),
  998. )
  999. def test_trunc_invalid_arguments(self):
  1000. msg = "output_field must be either DateField, TimeField, or DateTimeField"
  1001. with self.assertRaisesMessage(ValueError, msg):
  1002. list(
  1003. DTModel.objects.annotate(
  1004. truncated=Trunc(
  1005. "start_datetime", "year", output_field=IntegerField()
  1006. ),
  1007. )
  1008. )
  1009. msg = "'name' isn't a DateField, TimeField, or DateTimeField."
  1010. with self.assertRaisesMessage(TypeError, msg):
  1011. list(
  1012. DTModel.objects.annotate(
  1013. truncated=Trunc("name", "year", output_field=DateTimeField()),
  1014. )
  1015. )
  1016. msg = "Cannot truncate DateField 'start_date' to DateTimeField"
  1017. with self.assertRaisesMessage(ValueError, msg):
  1018. list(DTModel.objects.annotate(truncated=Trunc("start_date", "second")))
  1019. with self.assertRaisesMessage(ValueError, msg):
  1020. list(
  1021. DTModel.objects.annotate(
  1022. truncated=Trunc(
  1023. "start_date", "month", output_field=DateTimeField()
  1024. ),
  1025. )
  1026. )
  1027. msg = "Cannot truncate TimeField 'start_time' to DateTimeField"
  1028. with self.assertRaisesMessage(ValueError, msg):
  1029. list(DTModel.objects.annotate(truncated=Trunc("start_time", "month")))
  1030. with self.assertRaisesMessage(ValueError, msg):
  1031. list(
  1032. DTModel.objects.annotate(
  1033. truncated=Trunc(
  1034. "start_time", "second", output_field=DateTimeField()
  1035. ),
  1036. )
  1037. )
  1038. def test_trunc_none(self):
  1039. self.create_model(None, None)
  1040. for t in (
  1041. Trunc("start_datetime", "year"),
  1042. Trunc("start_date", "year"),
  1043. Trunc("start_time", "hour"),
  1044. ):
  1045. with self.subTest(t):
  1046. self.assertIsNone(
  1047. DTModel.objects.annotate(truncated=t).first().truncated
  1048. )
  1049. def test_trunc_year_func(self):
  1050. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  1051. end_datetime = truncate_to(
  1052. datetime.datetime(2016, 6, 15, 14, 10, 50, 123), "year"
  1053. )
  1054. if settings.USE_TZ:
  1055. start_datetime = timezone.make_aware(start_datetime)
  1056. end_datetime = timezone.make_aware(end_datetime)
  1057. self.create_model(start_datetime, end_datetime)
  1058. self.create_model(end_datetime, start_datetime)
  1059. self.assertQuerySetEqual(
  1060. DTModel.objects.annotate(extracted=TruncYear("start_datetime")).order_by(
  1061. "start_datetime"
  1062. ),
  1063. [
  1064. (start_datetime, truncate_to(start_datetime, "year")),
  1065. (end_datetime, truncate_to(end_datetime, "year")),
  1066. ],
  1067. lambda m: (m.start_datetime, m.extracted),
  1068. )
  1069. self.assertQuerySetEqual(
  1070. DTModel.objects.annotate(extracted=TruncYear("start_date")).order_by(
  1071. "start_datetime"
  1072. ),
  1073. [
  1074. (start_datetime, truncate_to(start_datetime.date(), "year")),
  1075. (end_datetime, truncate_to(end_datetime.date(), "year")),
  1076. ],
  1077. lambda m: (m.start_datetime, m.extracted),
  1078. )
  1079. self.assertEqual(
  1080. DTModel.objects.filter(start_datetime=TruncYear("start_datetime")).count(),
  1081. 1,
  1082. )
  1083. with self.assertRaisesMessage(
  1084. ValueError, "Cannot truncate TimeField 'start_time' to DateTimeField"
  1085. ):
  1086. list(DTModel.objects.annotate(truncated=TruncYear("start_time")))
  1087. with self.assertRaisesMessage(
  1088. ValueError, "Cannot truncate TimeField 'start_time' to DateTimeField"
  1089. ):
  1090. list(
  1091. DTModel.objects.annotate(
  1092. truncated=TruncYear("start_time", output_field=TimeField())
  1093. )
  1094. )
  1095. def test_trunc_quarter_func(self):
  1096. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  1097. end_datetime = truncate_to(
  1098. datetime.datetime(2016, 10, 15, 14, 10, 50, 123), "quarter"
  1099. )
  1100. last_quarter_2015 = truncate_to(
  1101. datetime.datetime(2015, 12, 31, 14, 10, 50, 123), "quarter"
  1102. )
  1103. first_quarter_2016 = truncate_to(
  1104. datetime.datetime(2016, 1, 1, 14, 10, 50, 123), "quarter"
  1105. )
  1106. if settings.USE_TZ:
  1107. start_datetime = timezone.make_aware(start_datetime)
  1108. end_datetime = timezone.make_aware(end_datetime)
  1109. last_quarter_2015 = timezone.make_aware(last_quarter_2015)
  1110. first_quarter_2016 = timezone.make_aware(first_quarter_2016)
  1111. self.create_model(start_datetime=start_datetime, end_datetime=end_datetime)
  1112. self.create_model(start_datetime=end_datetime, end_datetime=start_datetime)
  1113. self.create_model(start_datetime=last_quarter_2015, end_datetime=end_datetime)
  1114. self.create_model(start_datetime=first_quarter_2016, end_datetime=end_datetime)
  1115. self.assertQuerySetEqual(
  1116. DTModel.objects.annotate(extracted=TruncQuarter("start_date")).order_by(
  1117. "start_datetime"
  1118. ),
  1119. [
  1120. (start_datetime, truncate_to(start_datetime.date(), "quarter")),
  1121. (last_quarter_2015, truncate_to(last_quarter_2015.date(), "quarter")),
  1122. (first_quarter_2016, truncate_to(first_quarter_2016.date(), "quarter")),
  1123. (end_datetime, truncate_to(end_datetime.date(), "quarter")),
  1124. ],
  1125. lambda m: (m.start_datetime, m.extracted),
  1126. )
  1127. self.assertQuerySetEqual(
  1128. DTModel.objects.annotate(extracted=TruncQuarter("start_datetime")).order_by(
  1129. "start_datetime"
  1130. ),
  1131. [
  1132. (start_datetime, truncate_to(start_datetime, "quarter")),
  1133. (last_quarter_2015, truncate_to(last_quarter_2015, "quarter")),
  1134. (first_quarter_2016, truncate_to(first_quarter_2016, "quarter")),
  1135. (end_datetime, truncate_to(end_datetime, "quarter")),
  1136. ],
  1137. lambda m: (m.start_datetime, m.extracted),
  1138. )
  1139. with self.assertRaisesMessage(
  1140. ValueError, "Cannot truncate TimeField 'start_time' to DateTimeField"
  1141. ):
  1142. list(DTModel.objects.annotate(truncated=TruncQuarter("start_time")))
  1143. with self.assertRaisesMessage(
  1144. ValueError, "Cannot truncate TimeField 'start_time' to DateTimeField"
  1145. ):
  1146. list(
  1147. DTModel.objects.annotate(
  1148. truncated=TruncQuarter("start_time", output_field=TimeField())
  1149. )
  1150. )
  1151. def test_trunc_month_func(self):
  1152. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  1153. end_datetime = truncate_to(
  1154. datetime.datetime(2016, 6, 15, 14, 10, 50, 123), "month"
  1155. )
  1156. if settings.USE_TZ:
  1157. start_datetime = timezone.make_aware(start_datetime)
  1158. end_datetime = timezone.make_aware(end_datetime)
  1159. self.create_model(start_datetime, end_datetime)
  1160. self.create_model(end_datetime, start_datetime)
  1161. self.assertQuerySetEqual(
  1162. DTModel.objects.annotate(extracted=TruncMonth("start_datetime")).order_by(
  1163. "start_datetime"
  1164. ),
  1165. [
  1166. (start_datetime, truncate_to(start_datetime, "month")),
  1167. (end_datetime, truncate_to(end_datetime, "month")),
  1168. ],
  1169. lambda m: (m.start_datetime, m.extracted),
  1170. )
  1171. self.assertQuerySetEqual(
  1172. DTModel.objects.annotate(extracted=TruncMonth("start_date")).order_by(
  1173. "start_datetime"
  1174. ),
  1175. [
  1176. (start_datetime, truncate_to(start_datetime.date(), "month")),
  1177. (end_datetime, truncate_to(end_datetime.date(), "month")),
  1178. ],
  1179. lambda m: (m.start_datetime, m.extracted),
  1180. )
  1181. self.assertEqual(
  1182. DTModel.objects.filter(start_datetime=TruncMonth("start_datetime")).count(),
  1183. 1,
  1184. )
  1185. with self.assertRaisesMessage(
  1186. ValueError, "Cannot truncate TimeField 'start_time' to DateTimeField"
  1187. ):
  1188. list(DTModel.objects.annotate(truncated=TruncMonth("start_time")))
  1189. with self.assertRaisesMessage(
  1190. ValueError, "Cannot truncate TimeField 'start_time' to DateTimeField"
  1191. ):
  1192. list(
  1193. DTModel.objects.annotate(
  1194. truncated=TruncMonth("start_time", output_field=TimeField())
  1195. )
  1196. )
  1197. def test_trunc_week_func(self):
  1198. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  1199. end_datetime = truncate_to(
  1200. datetime.datetime(2016, 6, 15, 14, 10, 50, 123), "week"
  1201. )
  1202. if settings.USE_TZ:
  1203. start_datetime = timezone.make_aware(start_datetime)
  1204. end_datetime = timezone.make_aware(end_datetime)
  1205. self.create_model(start_datetime, end_datetime)
  1206. self.create_model(end_datetime, start_datetime)
  1207. self.assertQuerySetEqual(
  1208. DTModel.objects.annotate(extracted=TruncWeek("start_datetime")).order_by(
  1209. "start_datetime"
  1210. ),
  1211. [
  1212. (start_datetime, truncate_to(start_datetime, "week")),
  1213. (end_datetime, truncate_to(end_datetime, "week")),
  1214. ],
  1215. lambda m: (m.start_datetime, m.extracted),
  1216. )
  1217. self.assertEqual(
  1218. DTModel.objects.filter(start_datetime=TruncWeek("start_datetime")).count(),
  1219. 1,
  1220. )
  1221. with self.assertRaisesMessage(
  1222. ValueError, "Cannot truncate TimeField 'start_time' to DateTimeField"
  1223. ):
  1224. list(DTModel.objects.annotate(truncated=TruncWeek("start_time")))
  1225. with self.assertRaisesMessage(
  1226. ValueError, "Cannot truncate TimeField 'start_time' to DateTimeField"
  1227. ):
  1228. list(
  1229. DTModel.objects.annotate(
  1230. truncated=TruncWeek("start_time", output_field=TimeField())
  1231. )
  1232. )
  1233. def test_trunc_date_func(self):
  1234. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  1235. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  1236. if settings.USE_TZ:
  1237. start_datetime = timezone.make_aware(start_datetime)
  1238. end_datetime = timezone.make_aware(end_datetime)
  1239. self.create_model(start_datetime, end_datetime)
  1240. self.create_model(end_datetime, start_datetime)
  1241. self.assertQuerySetEqual(
  1242. DTModel.objects.annotate(extracted=TruncDate("start_datetime")).order_by(
  1243. "start_datetime"
  1244. ),
  1245. [
  1246. (start_datetime, start_datetime.date()),
  1247. (end_datetime, end_datetime.date()),
  1248. ],
  1249. lambda m: (m.start_datetime, m.extracted),
  1250. )
  1251. self.assertEqual(
  1252. DTModel.objects.filter(
  1253. start_datetime__date=TruncDate("start_datetime")
  1254. ).count(),
  1255. 2,
  1256. )
  1257. with self.assertRaisesMessage(
  1258. ValueError, "Cannot truncate TimeField 'start_time' to DateField"
  1259. ):
  1260. list(DTModel.objects.annotate(truncated=TruncDate("start_time")))
  1261. with self.assertRaisesMessage(
  1262. ValueError, "Cannot truncate TimeField 'start_time' to DateField"
  1263. ):
  1264. list(
  1265. DTModel.objects.annotate(
  1266. truncated=TruncDate("start_time", output_field=TimeField())
  1267. )
  1268. )
  1269. def test_trunc_date_none(self):
  1270. self.create_model(None, None)
  1271. self.assertIsNone(
  1272. DTModel.objects.annotate(truncated=TruncDate("start_datetime"))
  1273. .first()
  1274. .truncated
  1275. )
  1276. def test_trunc_time_func(self):
  1277. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  1278. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  1279. if settings.USE_TZ:
  1280. start_datetime = timezone.make_aware(start_datetime)
  1281. end_datetime = timezone.make_aware(end_datetime)
  1282. self.create_model(start_datetime, end_datetime)
  1283. self.create_model(end_datetime, start_datetime)
  1284. self.assertQuerySetEqual(
  1285. DTModel.objects.annotate(extracted=TruncTime("start_datetime")).order_by(
  1286. "start_datetime"
  1287. ),
  1288. [
  1289. (start_datetime, start_datetime.time()),
  1290. (end_datetime, end_datetime.time()),
  1291. ],
  1292. lambda m: (m.start_datetime, m.extracted),
  1293. )
  1294. self.assertEqual(
  1295. DTModel.objects.filter(
  1296. start_datetime__time=TruncTime("start_datetime")
  1297. ).count(),
  1298. 2,
  1299. )
  1300. with self.assertRaisesMessage(
  1301. ValueError, "Cannot truncate DateField 'start_date' to TimeField"
  1302. ):
  1303. list(DTModel.objects.annotate(truncated=TruncTime("start_date")))
  1304. with self.assertRaisesMessage(
  1305. ValueError, "Cannot truncate DateField 'start_date' to TimeField"
  1306. ):
  1307. list(
  1308. DTModel.objects.annotate(
  1309. truncated=TruncTime("start_date", output_field=DateField())
  1310. )
  1311. )
  1312. def test_trunc_time_none(self):
  1313. self.create_model(None, None)
  1314. self.assertIsNone(
  1315. DTModel.objects.annotate(truncated=TruncTime("start_datetime"))
  1316. .first()
  1317. .truncated
  1318. )
  1319. def test_trunc_time_comparison(self):
  1320. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 26) # 0 microseconds.
  1321. end_datetime = datetime.datetime(2015, 6, 15, 14, 30, 26, 321)
  1322. if settings.USE_TZ:
  1323. start_datetime = timezone.make_aware(start_datetime)
  1324. end_datetime = timezone.make_aware(end_datetime)
  1325. self.create_model(start_datetime, end_datetime)
  1326. self.assertIs(
  1327. DTModel.objects.filter(
  1328. start_datetime__time=start_datetime.time(),
  1329. end_datetime__time=end_datetime.time(),
  1330. ).exists(),
  1331. True,
  1332. )
  1333. self.assertIs(
  1334. DTModel.objects.annotate(
  1335. extracted_start=TruncTime("start_datetime"),
  1336. extracted_end=TruncTime("end_datetime"),
  1337. )
  1338. .filter(
  1339. extracted_start=start_datetime.time(),
  1340. extracted_end=end_datetime.time(),
  1341. )
  1342. .exists(),
  1343. True,
  1344. )
  1345. def test_trunc_day_func(self):
  1346. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  1347. end_datetime = truncate_to(
  1348. datetime.datetime(2016, 6, 15, 14, 10, 50, 123), "day"
  1349. )
  1350. if settings.USE_TZ:
  1351. start_datetime = timezone.make_aware(start_datetime)
  1352. end_datetime = timezone.make_aware(end_datetime)
  1353. self.create_model(start_datetime, end_datetime)
  1354. self.create_model(end_datetime, start_datetime)
  1355. self.assertQuerySetEqual(
  1356. DTModel.objects.annotate(extracted=TruncDay("start_datetime")).order_by(
  1357. "start_datetime"
  1358. ),
  1359. [
  1360. (start_datetime, truncate_to(start_datetime, "day")),
  1361. (end_datetime, truncate_to(end_datetime, "day")),
  1362. ],
  1363. lambda m: (m.start_datetime, m.extracted),
  1364. )
  1365. self.assertEqual(
  1366. DTModel.objects.filter(start_datetime=TruncDay("start_datetime")).count(), 1
  1367. )
  1368. with self.assertRaisesMessage(
  1369. ValueError, "Cannot truncate TimeField 'start_time' to DateTimeField"
  1370. ):
  1371. list(DTModel.objects.annotate(truncated=TruncDay("start_time")))
  1372. with self.assertRaisesMessage(
  1373. ValueError, "Cannot truncate TimeField 'start_time' to DateTimeField"
  1374. ):
  1375. list(
  1376. DTModel.objects.annotate(
  1377. truncated=TruncDay("start_time", output_field=TimeField())
  1378. )
  1379. )
  1380. def test_trunc_hour_func(self):
  1381. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  1382. end_datetime = truncate_to(
  1383. datetime.datetime(2016, 6, 15, 14, 10, 50, 123), "hour"
  1384. )
  1385. if settings.USE_TZ:
  1386. start_datetime = timezone.make_aware(start_datetime)
  1387. end_datetime = timezone.make_aware(end_datetime)
  1388. self.create_model(start_datetime, end_datetime)
  1389. self.create_model(end_datetime, start_datetime)
  1390. self.assertQuerySetEqual(
  1391. DTModel.objects.annotate(extracted=TruncHour("start_datetime")).order_by(
  1392. "start_datetime"
  1393. ),
  1394. [
  1395. (start_datetime, truncate_to(start_datetime, "hour")),
  1396. (end_datetime, truncate_to(end_datetime, "hour")),
  1397. ],
  1398. lambda m: (m.start_datetime, m.extracted),
  1399. )
  1400. self.assertQuerySetEqual(
  1401. DTModel.objects.annotate(extracted=TruncHour("start_time")).order_by(
  1402. "start_datetime"
  1403. ),
  1404. [
  1405. (start_datetime, truncate_to(start_datetime.time(), "hour")),
  1406. (end_datetime, truncate_to(end_datetime.time(), "hour")),
  1407. ],
  1408. lambda m: (m.start_datetime, m.extracted),
  1409. )
  1410. self.assertEqual(
  1411. DTModel.objects.filter(start_datetime=TruncHour("start_datetime")).count(),
  1412. 1,
  1413. )
  1414. with self.assertRaisesMessage(
  1415. ValueError, "Cannot truncate DateField 'start_date' to DateTimeField"
  1416. ):
  1417. list(DTModel.objects.annotate(truncated=TruncHour("start_date")))
  1418. with self.assertRaisesMessage(
  1419. ValueError, "Cannot truncate DateField 'start_date' to DateTimeField"
  1420. ):
  1421. list(
  1422. DTModel.objects.annotate(
  1423. truncated=TruncHour("start_date", output_field=DateField())
  1424. )
  1425. )
  1426. def test_trunc_minute_func(self):
  1427. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  1428. end_datetime = truncate_to(
  1429. datetime.datetime(2016, 6, 15, 14, 10, 50, 123), "minute"
  1430. )
  1431. if settings.USE_TZ:
  1432. start_datetime = timezone.make_aware(start_datetime)
  1433. end_datetime = timezone.make_aware(end_datetime)
  1434. self.create_model(start_datetime, end_datetime)
  1435. self.create_model(end_datetime, start_datetime)
  1436. self.assertQuerySetEqual(
  1437. DTModel.objects.annotate(extracted=TruncMinute("start_datetime")).order_by(
  1438. "start_datetime"
  1439. ),
  1440. [
  1441. (start_datetime, truncate_to(start_datetime, "minute")),
  1442. (end_datetime, truncate_to(end_datetime, "minute")),
  1443. ],
  1444. lambda m: (m.start_datetime, m.extracted),
  1445. )
  1446. self.assertQuerySetEqual(
  1447. DTModel.objects.annotate(extracted=TruncMinute("start_time")).order_by(
  1448. "start_datetime"
  1449. ),
  1450. [
  1451. (start_datetime, truncate_to(start_datetime.time(), "minute")),
  1452. (end_datetime, truncate_to(end_datetime.time(), "minute")),
  1453. ],
  1454. lambda m: (m.start_datetime, m.extracted),
  1455. )
  1456. self.assertEqual(
  1457. DTModel.objects.filter(
  1458. start_datetime=TruncMinute("start_datetime")
  1459. ).count(),
  1460. 1,
  1461. )
  1462. with self.assertRaisesMessage(
  1463. ValueError, "Cannot truncate DateField 'start_date' to DateTimeField"
  1464. ):
  1465. list(DTModel.objects.annotate(truncated=TruncMinute("start_date")))
  1466. with self.assertRaisesMessage(
  1467. ValueError, "Cannot truncate DateField 'start_date' to DateTimeField"
  1468. ):
  1469. list(
  1470. DTModel.objects.annotate(
  1471. truncated=TruncMinute("start_date", output_field=DateField())
  1472. )
  1473. )
  1474. def test_trunc_second_func(self):
  1475. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  1476. end_datetime = truncate_to(
  1477. datetime.datetime(2016, 6, 15, 14, 10, 50, 123), "second"
  1478. )
  1479. if settings.USE_TZ:
  1480. start_datetime = timezone.make_aware(start_datetime)
  1481. end_datetime = timezone.make_aware(end_datetime)
  1482. self.create_model(start_datetime, end_datetime)
  1483. self.create_model(end_datetime, start_datetime)
  1484. self.assertQuerySetEqual(
  1485. DTModel.objects.annotate(extracted=TruncSecond("start_datetime")).order_by(
  1486. "start_datetime"
  1487. ),
  1488. [
  1489. (start_datetime, truncate_to(start_datetime, "second")),
  1490. (end_datetime, truncate_to(end_datetime, "second")),
  1491. ],
  1492. lambda m: (m.start_datetime, m.extracted),
  1493. )
  1494. self.assertQuerySetEqual(
  1495. DTModel.objects.annotate(extracted=TruncSecond("start_time")).order_by(
  1496. "start_datetime"
  1497. ),
  1498. [
  1499. (start_datetime, truncate_to(start_datetime.time(), "second")),
  1500. (end_datetime, truncate_to(end_datetime.time(), "second")),
  1501. ],
  1502. lambda m: (m.start_datetime, m.extracted),
  1503. )
  1504. self.assertEqual(
  1505. DTModel.objects.filter(
  1506. start_datetime=TruncSecond("start_datetime")
  1507. ).count(),
  1508. 1,
  1509. )
  1510. with self.assertRaisesMessage(
  1511. ValueError, "Cannot truncate DateField 'start_date' to DateTimeField"
  1512. ):
  1513. list(DTModel.objects.annotate(truncated=TruncSecond("start_date")))
  1514. with self.assertRaisesMessage(
  1515. ValueError, "Cannot truncate DateField 'start_date' to DateTimeField"
  1516. ):
  1517. list(
  1518. DTModel.objects.annotate(
  1519. truncated=TruncSecond("start_date", output_field=DateField())
  1520. )
  1521. )
  1522. def test_trunc_subquery_with_parameters(self):
  1523. author_1 = Author.objects.create(name="J. R. R. Tolkien")
  1524. author_2 = Author.objects.create(name="G. R. R. Martin")
  1525. fan_since_1 = datetime.datetime(2016, 2, 3, 15, 0, 0)
  1526. fan_since_2 = datetime.datetime(2015, 2, 3, 15, 0, 0)
  1527. fan_since_3 = datetime.datetime(2017, 2, 3, 15, 0, 0)
  1528. if settings.USE_TZ:
  1529. fan_since_1 = timezone.make_aware(fan_since_1)
  1530. fan_since_2 = timezone.make_aware(fan_since_2)
  1531. fan_since_3 = timezone.make_aware(fan_since_3)
  1532. Fan.objects.create(author=author_1, name="Tom", fan_since=fan_since_1)
  1533. Fan.objects.create(author=author_1, name="Emma", fan_since=fan_since_2)
  1534. Fan.objects.create(author=author_2, name="Isabella", fan_since=fan_since_3)
  1535. inner = (
  1536. Fan.objects.filter(
  1537. author=OuterRef("pk"), name__in=("Emma", "Isabella", "Tom")
  1538. )
  1539. .values("author")
  1540. .annotate(newest_fan=Max("fan_since"))
  1541. .values("newest_fan")
  1542. )
  1543. outer = Author.objects.annotate(
  1544. newest_fan_year=TruncYear(Subquery(inner, output_field=DateTimeField()))
  1545. )
  1546. tz = datetime.UTC if settings.USE_TZ else None
  1547. self.assertSequenceEqual(
  1548. outer.order_by("name").values("name", "newest_fan_year"),
  1549. [
  1550. {
  1551. "name": "G. R. R. Martin",
  1552. "newest_fan_year": datetime.datetime(2017, 1, 1, 0, 0, tzinfo=tz),
  1553. },
  1554. {
  1555. "name": "J. R. R. Tolkien",
  1556. "newest_fan_year": datetime.datetime(2016, 1, 1, 0, 0, tzinfo=tz),
  1557. },
  1558. ],
  1559. )
  1560. def test_extract_outerref(self):
  1561. datetime_1 = datetime.datetime(2000, 1, 1)
  1562. datetime_2 = datetime.datetime(2001, 3, 5)
  1563. datetime_3 = datetime.datetime(2002, 1, 3)
  1564. if settings.USE_TZ:
  1565. datetime_1 = timezone.make_aware(datetime_1)
  1566. datetime_2 = timezone.make_aware(datetime_2)
  1567. datetime_3 = timezone.make_aware(datetime_3)
  1568. obj_1 = self.create_model(datetime_1, datetime_3)
  1569. obj_2 = self.create_model(datetime_2, datetime_1)
  1570. obj_3 = self.create_model(datetime_3, datetime_2)
  1571. inner_qs = DTModel.objects.filter(
  1572. start_datetime__year=2000,
  1573. start_datetime__month=ExtractMonth(OuterRef("end_datetime")),
  1574. )
  1575. qs = DTModel.objects.annotate(
  1576. related_pk=Subquery(inner_qs.values("pk")[:1]),
  1577. )
  1578. self.assertSequenceEqual(
  1579. qs.order_by("name").values("pk", "related_pk"),
  1580. [
  1581. {"pk": obj_1.pk, "related_pk": obj_1.pk},
  1582. {"pk": obj_2.pk, "related_pk": obj_1.pk},
  1583. {"pk": obj_3.pk, "related_pk": None},
  1584. ],
  1585. )
  1586. @override_settings(USE_TZ=True, TIME_ZONE="UTC")
  1587. class DateFunctionWithTimeZoneTests(DateFunctionTests):
  1588. def test_extract_func_with_timezone(self):
  1589. start_datetime = datetime.datetime(2015, 6, 15, 23, 30, 1, 321)
  1590. end_datetime = datetime.datetime(2015, 6, 16, 13, 11, 27, 123)
  1591. start_datetime = timezone.make_aware(start_datetime)
  1592. end_datetime = timezone.make_aware(end_datetime)
  1593. self.create_model(start_datetime, end_datetime)
  1594. delta_tzinfo_pos = datetime.timezone(datetime.timedelta(hours=5))
  1595. delta_tzinfo_neg = datetime.timezone(datetime.timedelta(hours=-5, minutes=17))
  1596. melb = zoneinfo.ZoneInfo("Australia/Melbourne")
  1597. qs = DTModel.objects.annotate(
  1598. day=Extract("start_datetime", "day"),
  1599. day_melb=Extract("start_datetime", "day", tzinfo=melb),
  1600. week=Extract("start_datetime", "week", tzinfo=melb),
  1601. isoyear=ExtractIsoYear("start_datetime", tzinfo=melb),
  1602. weekday=ExtractWeekDay("start_datetime"),
  1603. weekday_melb=ExtractWeekDay("start_datetime", tzinfo=melb),
  1604. isoweekday=ExtractIsoWeekDay("start_datetime"),
  1605. isoweekday_melb=ExtractIsoWeekDay("start_datetime", tzinfo=melb),
  1606. quarter=ExtractQuarter("start_datetime", tzinfo=melb),
  1607. hour=ExtractHour("start_datetime"),
  1608. hour_melb=ExtractHour("start_datetime", tzinfo=melb),
  1609. hour_with_delta_pos=ExtractHour("start_datetime", tzinfo=delta_tzinfo_pos),
  1610. hour_with_delta_neg=ExtractHour("start_datetime", tzinfo=delta_tzinfo_neg),
  1611. minute_with_delta_neg=ExtractMinute(
  1612. "start_datetime", tzinfo=delta_tzinfo_neg
  1613. ),
  1614. ).order_by("start_datetime")
  1615. utc_model = qs.get()
  1616. self.assertEqual(utc_model.day, 15)
  1617. self.assertEqual(utc_model.day_melb, 16)
  1618. self.assertEqual(utc_model.week, 25)
  1619. self.assertEqual(utc_model.isoyear, 2015)
  1620. self.assertEqual(utc_model.weekday, 2)
  1621. self.assertEqual(utc_model.weekday_melb, 3)
  1622. self.assertEqual(utc_model.isoweekday, 1)
  1623. self.assertEqual(utc_model.isoweekday_melb, 2)
  1624. self.assertEqual(utc_model.quarter, 2)
  1625. self.assertEqual(utc_model.hour, 23)
  1626. self.assertEqual(utc_model.hour_melb, 9)
  1627. self.assertEqual(utc_model.hour_with_delta_pos, 4)
  1628. self.assertEqual(utc_model.hour_with_delta_neg, 18)
  1629. self.assertEqual(utc_model.minute_with_delta_neg, 47)
  1630. with timezone.override(melb):
  1631. melb_model = qs.get()
  1632. self.assertEqual(melb_model.day, 16)
  1633. self.assertEqual(melb_model.day_melb, 16)
  1634. self.assertEqual(melb_model.week, 25)
  1635. self.assertEqual(melb_model.isoyear, 2015)
  1636. self.assertEqual(melb_model.weekday, 3)
  1637. self.assertEqual(melb_model.isoweekday, 2)
  1638. self.assertEqual(melb_model.quarter, 2)
  1639. self.assertEqual(melb_model.weekday_melb, 3)
  1640. self.assertEqual(melb_model.isoweekday_melb, 2)
  1641. self.assertEqual(melb_model.hour, 9)
  1642. self.assertEqual(melb_model.hour_melb, 9)
  1643. def test_extract_func_with_timezone_minus_no_offset(self):
  1644. start_datetime = datetime.datetime(2015, 6, 15, 23, 30, 1, 321)
  1645. end_datetime = datetime.datetime(2015, 6, 16, 13, 11, 27, 123)
  1646. start_datetime = timezone.make_aware(start_datetime)
  1647. end_datetime = timezone.make_aware(end_datetime)
  1648. self.create_model(start_datetime, end_datetime)
  1649. ust_nera = zoneinfo.ZoneInfo("Asia/Ust-Nera")
  1650. qs = DTModel.objects.annotate(
  1651. hour=ExtractHour("start_datetime"),
  1652. hour_tz=ExtractHour("start_datetime", tzinfo=ust_nera),
  1653. ).order_by("start_datetime")
  1654. utc_model = qs.get()
  1655. self.assertEqual(utc_model.hour, 23)
  1656. self.assertEqual(utc_model.hour_tz, 9)
  1657. with timezone.override(ust_nera):
  1658. ust_nera_model = qs.get()
  1659. self.assertEqual(ust_nera_model.hour, 9)
  1660. self.assertEqual(ust_nera_model.hour_tz, 9)
  1661. def test_extract_func_explicit_timezone_priority(self):
  1662. start_datetime = datetime.datetime(2015, 6, 15, 23, 30, 1, 321)
  1663. end_datetime = datetime.datetime(2015, 6, 16, 13, 11, 27, 123)
  1664. start_datetime = timezone.make_aware(start_datetime)
  1665. end_datetime = timezone.make_aware(end_datetime)
  1666. self.create_model(start_datetime, end_datetime)
  1667. melb = zoneinfo.ZoneInfo("Australia/Melbourne")
  1668. with timezone.override(melb):
  1669. model = (
  1670. DTModel.objects.annotate(
  1671. day_melb=Extract("start_datetime", "day"),
  1672. day_utc=Extract("start_datetime", "day", tzinfo=datetime.UTC),
  1673. )
  1674. .order_by("start_datetime")
  1675. .get()
  1676. )
  1677. self.assertEqual(model.day_melb, 16)
  1678. self.assertEqual(model.day_utc, 15)
  1679. def test_extract_invalid_field_with_timezone(self):
  1680. melb = zoneinfo.ZoneInfo("Australia/Melbourne")
  1681. msg = "tzinfo can only be used with DateTimeField."
  1682. with self.assertRaisesMessage(ValueError, msg):
  1683. DTModel.objects.annotate(
  1684. day_melb=Extract("start_date", "day", tzinfo=melb),
  1685. ).get()
  1686. with self.assertRaisesMessage(ValueError, msg):
  1687. DTModel.objects.annotate(
  1688. hour_melb=Extract("start_time", "hour", tzinfo=melb),
  1689. ).get()
  1690. def test_trunc_timezone_applied_before_truncation(self):
  1691. start_datetime = datetime.datetime(2016, 1, 1, 1, 30, 50, 321)
  1692. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  1693. start_datetime = timezone.make_aware(start_datetime)
  1694. end_datetime = timezone.make_aware(end_datetime)
  1695. self.create_model(start_datetime, end_datetime)
  1696. melb = zoneinfo.ZoneInfo("Australia/Melbourne")
  1697. pacific = zoneinfo.ZoneInfo("America/Los_Angeles")
  1698. model = (
  1699. DTModel.objects.annotate(
  1700. melb_year=TruncYear("start_datetime", tzinfo=melb),
  1701. pacific_year=TruncYear("start_datetime", tzinfo=pacific),
  1702. melb_date=TruncDate("start_datetime", tzinfo=melb),
  1703. pacific_date=TruncDate("start_datetime", tzinfo=pacific),
  1704. melb_time=TruncTime("start_datetime", tzinfo=melb),
  1705. pacific_time=TruncTime("start_datetime", tzinfo=pacific),
  1706. )
  1707. .order_by("start_datetime")
  1708. .get()
  1709. )
  1710. melb_start_datetime = start_datetime.astimezone(melb)
  1711. pacific_start_datetime = start_datetime.astimezone(pacific)
  1712. self.assertEqual(model.start_datetime, start_datetime)
  1713. self.assertEqual(model.melb_year, truncate_to(start_datetime, "year", melb))
  1714. self.assertEqual(
  1715. model.pacific_year, truncate_to(start_datetime, "year", pacific)
  1716. )
  1717. self.assertEqual(model.start_datetime.year, 2016)
  1718. self.assertEqual(model.melb_year.year, 2016)
  1719. self.assertEqual(model.pacific_year.year, 2015)
  1720. self.assertEqual(model.melb_date, melb_start_datetime.date())
  1721. self.assertEqual(model.pacific_date, pacific_start_datetime.date())
  1722. self.assertEqual(model.melb_time, melb_start_datetime.time())
  1723. self.assertEqual(model.pacific_time, pacific_start_datetime.time())
  1724. def test_trunc_func_with_timezone(self):
  1725. """
  1726. If the truncated datetime transitions to a different offset (daylight
  1727. saving) then the returned value will have that new timezone/offset.
  1728. """
  1729. start_datetime = datetime.datetime(2015, 6, 15, 14, 30, 50, 321)
  1730. end_datetime = datetime.datetime(2016, 6, 15, 14, 10, 50, 123)
  1731. start_datetime = timezone.make_aware(start_datetime)
  1732. end_datetime = timezone.make_aware(end_datetime)
  1733. self.create_model(start_datetime, end_datetime)
  1734. self.create_model(end_datetime, start_datetime)
  1735. def assertDatetimeKind(kind, tzinfo):
  1736. truncated_start = truncate_to(
  1737. start_datetime.astimezone(tzinfo), kind, tzinfo
  1738. )
  1739. truncated_end = truncate_to(end_datetime.astimezone(tzinfo), kind, tzinfo)
  1740. queryset = DTModel.objects.annotate(
  1741. truncated=Trunc(
  1742. "start_datetime",
  1743. kind,
  1744. output_field=DateTimeField(),
  1745. tzinfo=tzinfo,
  1746. )
  1747. ).order_by("start_datetime")
  1748. self.assertSequenceEqual(
  1749. queryset.values_list("start_datetime", "truncated"),
  1750. [
  1751. (start_datetime, truncated_start),
  1752. (end_datetime, truncated_end),
  1753. ],
  1754. )
  1755. def assertDatetimeToDateKind(kind, tzinfo):
  1756. truncated_start = truncate_to(
  1757. start_datetime.astimezone(tzinfo).date(), kind
  1758. )
  1759. truncated_end = truncate_to(end_datetime.astimezone(tzinfo).date(), kind)
  1760. queryset = DTModel.objects.annotate(
  1761. truncated=Trunc(
  1762. "start_datetime",
  1763. kind,
  1764. output_field=DateField(),
  1765. tzinfo=tzinfo,
  1766. ),
  1767. ).order_by("start_datetime")
  1768. self.assertSequenceEqual(
  1769. queryset.values_list("start_datetime", "truncated"),
  1770. [
  1771. (start_datetime, truncated_start),
  1772. (end_datetime, truncated_end),
  1773. ],
  1774. )
  1775. def assertDatetimeToTimeKind(kind, tzinfo):
  1776. truncated_start = truncate_to(
  1777. start_datetime.astimezone(tzinfo).time(), kind
  1778. )
  1779. truncated_end = truncate_to(end_datetime.astimezone(tzinfo).time(), kind)
  1780. queryset = DTModel.objects.annotate(
  1781. truncated=Trunc(
  1782. "start_datetime",
  1783. kind,
  1784. output_field=TimeField(),
  1785. tzinfo=tzinfo,
  1786. )
  1787. ).order_by("start_datetime")
  1788. self.assertSequenceEqual(
  1789. queryset.values_list("start_datetime", "truncated"),
  1790. [
  1791. (start_datetime, truncated_start),
  1792. (end_datetime, truncated_end),
  1793. ],
  1794. )
  1795. timezones = [
  1796. zoneinfo.ZoneInfo("Australia/Melbourne"),
  1797. zoneinfo.ZoneInfo("Etc/GMT+10"),
  1798. ]
  1799. date_truncations = ["year", "quarter", "month", "week", "day"]
  1800. time_truncations = ["hour", "minute", "second"]
  1801. tests = [
  1802. (assertDatetimeToDateKind, date_truncations),
  1803. (assertDatetimeToTimeKind, time_truncations),
  1804. (assertDatetimeKind, [*date_truncations, *time_truncations]),
  1805. ]
  1806. for assertion, truncations in tests:
  1807. for truncation in truncations:
  1808. for tzinfo in timezones:
  1809. with self.subTest(
  1810. assertion=assertion.__name__,
  1811. truncation=truncation,
  1812. tzinfo=tzinfo.key,
  1813. ):
  1814. assertion(truncation, tzinfo)
  1815. qs = DTModel.objects.filter(
  1816. start_datetime__date=Trunc(
  1817. "start_datetime", "day", output_field=DateField()
  1818. )
  1819. )
  1820. self.assertEqual(qs.count(), 2)
  1821. def test_trunc_invalid_field_with_timezone(self):
  1822. melb = zoneinfo.ZoneInfo("Australia/Melbourne")
  1823. msg = "tzinfo can only be used with DateTimeField."
  1824. with self.assertRaisesMessage(ValueError, msg):
  1825. DTModel.objects.annotate(
  1826. day_melb=Trunc("start_date", "day", tzinfo=melb),
  1827. ).get()
  1828. with self.assertRaisesMessage(ValueError, msg):
  1829. DTModel.objects.annotate(
  1830. hour_melb=Trunc("start_time", "hour", tzinfo=melb),
  1831. ).get()