greenthreads.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # greenthreads.py -- Utility module for querying an ObjectStore with gevent
  2. # Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
  3. #
  4. # Author: Fabien Boucher <fabien.boucher@enovance.com>
  5. #
  6. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  7. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  8. # General Public License as published by the Free Software Foundation; version 2.0
  9. # or (at your option) any later version. You can redistribute it and/or
  10. # modify it under the terms of either of these two licenses.
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. #
  18. # You should have received a copy of the licenses; if not, see
  19. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  20. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  21. # License, Version 2.0.
  22. #
  23. """Utility module for querying an ObjectStore with gevent."""
  24. from typing import Callable, Optional
  25. import gevent
  26. from gevent import pool
  27. from .object_store import (
  28. BaseObjectStore,
  29. MissingObjectFinder,
  30. _collect_ancestors,
  31. _collect_filetree_revs,
  32. )
  33. from .objects import Commit, ObjectID, Tag
  34. def _split_commits_and_tags(
  35. obj_store: BaseObjectStore,
  36. lst: list[ObjectID],
  37. *,
  38. ignore_unknown: bool = False,
  39. pool: pool.Pool,
  40. ) -> tuple[set[ObjectID], set[ObjectID]]:
  41. """Split object id list into two list with commit SHA1s and tag SHA1s.
  42. Same implementation as object_store._split_commits_and_tags
  43. except we use gevent to parallelize object retrieval.
  44. """
  45. commits = set()
  46. tags = set()
  47. def find_commit_type(sha: ObjectID) -> None:
  48. try:
  49. o = obj_store[sha]
  50. except KeyError:
  51. if not ignore_unknown:
  52. raise
  53. else:
  54. if isinstance(o, Commit):
  55. commits.add(sha)
  56. elif isinstance(o, Tag):
  57. tags.add(sha)
  58. commits.add(o.object[1])
  59. else:
  60. raise KeyError(f"Not a commit or a tag: {sha!r}")
  61. jobs = [pool.spawn(find_commit_type, s) for s in lst]
  62. gevent.joinall(jobs)
  63. return (commits, tags)
  64. class GreenThreadsMissingObjectFinder(MissingObjectFinder):
  65. """Find the objects missing from another object store.
  66. Same implementation as object_store.MissingObjectFinder
  67. except we use gevent to parallelize object retrieval.
  68. """
  69. def __init__(
  70. self,
  71. object_store: BaseObjectStore,
  72. haves: list[ObjectID],
  73. wants: list[ObjectID],
  74. progress: Optional[Callable[[bytes], None]] = None,
  75. get_tagged: Optional[Callable[[], dict[ObjectID, ObjectID]]] = None,
  76. concurrency: int = 1,
  77. get_parents: Optional[Callable[[ObjectID], list[ObjectID]]] = None,
  78. ) -> None:
  79. """Initialize GreenThreadsMissingObjectFinder.
  80. Args:
  81. object_store: Object store to search
  82. haves: Objects we have
  83. wants: Objects we want
  84. progress: Optional progress callback
  85. get_tagged: Optional function to get tagged objects
  86. concurrency: Number of concurrent green threads
  87. get_parents: Optional function to get commit parents
  88. """
  89. def collect_tree_sha(sha: ObjectID) -> None:
  90. self.sha_done.add(sha)
  91. obj = object_store[sha]
  92. if isinstance(obj, Commit):
  93. _collect_filetree_revs(object_store, obj.tree, self.sha_done)
  94. self.object_store = object_store
  95. p = pool.Pool(size=concurrency)
  96. have_commits, have_tags = _split_commits_and_tags(
  97. object_store, haves, ignore_unknown=True, pool=p
  98. )
  99. want_commits, want_tags = _split_commits_and_tags(
  100. object_store, wants, ignore_unknown=False, pool=p
  101. )
  102. all_ancestors: frozenset[ObjectID] = frozenset(
  103. _collect_ancestors(object_store, have_commits)[0]
  104. )
  105. missing_commits, common_commits = _collect_ancestors(
  106. object_store, want_commits, all_ancestors
  107. )
  108. self.sha_done = set()
  109. jobs = [p.spawn(collect_tree_sha, c) for c in common_commits]
  110. gevent.joinall(jobs)
  111. for t in have_tags:
  112. self.sha_done.add(t)
  113. missing_tags = want_tags.difference(have_tags)
  114. all_wants = missing_commits.union(missing_tags)
  115. self.objects_to_send: set[
  116. tuple[ObjectID, Optional[bytes], Optional[int], bool]
  117. ] = {(w, None, 0, False) for w in all_wants}
  118. if progress is None:
  119. self.progress: Callable[[bytes], None] = lambda x: None
  120. else:
  121. self.progress = progress
  122. self._tagged = (get_tagged and get_tagged()) or {}