2
0

greenthreads.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. # greenthreads.py -- Utility module for querying an ObjectStore with gevent
  2. # Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
  3. #
  4. # Author: Fabien Boucher <fabien.boucher@enovance.com>
  5. #
  6. # This program is free software; you can redistribute it and/or
  7. # modify it under the terms of the GNU General Public License
  8. # as published by the Free Software Foundation; version 2
  9. # of the License or (at your option) any later version of
  10. # the License.
  11. #
  12. # This program is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU General Public License
  18. # along with this program; if not, write to the Free Software
  19. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  20. # MA 02110-1301, USA.
  21. """Utility module for querying an ObjectStore with gevent."""
  22. import gevent
  23. from gevent import pool
  24. from dulwich.objects import (
  25. Commit,
  26. Tag,
  27. )
  28. from dulwich.object_store import (
  29. MissingObjectFinder,
  30. _collect_filetree_revs,
  31. ObjectStoreIterator,
  32. )
  33. def _split_commits_and_tags(obj_store, lst,
  34. ignore_unknown=False, pool=None):
  35. """Split object id list into two list with commit SHA1s and tag SHA1s.
  36. Same implementation as object_store._split_commits_and_tags
  37. except we use gevent to parallelize object retrieval.
  38. """
  39. commits = set()
  40. tags = set()
  41. def find_commit_type(sha):
  42. try:
  43. o = obj_store[sha]
  44. except KeyError:
  45. if not ignore_unknown:
  46. raise
  47. else:
  48. if isinstance(o, Commit):
  49. commits.add(sha)
  50. elif isinstance(o, Tag):
  51. tags.add(sha)
  52. commits.add(o.object[1])
  53. else:
  54. raise KeyError('Not a commit or a tag: %s' % sha)
  55. jobs = [pool.spawn(find_commit_type, s) for s in lst]
  56. gevent.joinall(jobs)
  57. return (commits, tags)
  58. class GreenThreadsMissingObjectFinder(MissingObjectFinder):
  59. """Find the objects missing from another object store.
  60. Same implementation as object_store.MissingObjectFinder
  61. except we use gevent to parallelize object retrieval.
  62. """
  63. def __init__(self, object_store, haves, wants,
  64. progress=None, get_tagged=None,
  65. concurrency=1, get_parents=None):
  66. def collect_tree_sha(sha):
  67. self.sha_done.add(sha)
  68. cmt = object_store[sha]
  69. _collect_filetree_revs(object_store, cmt.tree, self.sha_done)
  70. self.object_store = object_store
  71. p = pool.Pool(size=concurrency)
  72. have_commits, have_tags = \
  73. _split_commits_and_tags(object_store, haves,
  74. True, p)
  75. want_commits, want_tags = \
  76. _split_commits_and_tags(object_store, wants,
  77. False, p)
  78. all_ancestors = object_store._collect_ancestors(have_commits)[0]
  79. missing_commits, common_commits = \
  80. object_store._collect_ancestors(want_commits, all_ancestors)
  81. self.sha_done = set()
  82. jobs = [p.spawn(collect_tree_sha, c) for c in common_commits]
  83. gevent.joinall(jobs)
  84. for t in have_tags:
  85. self.sha_done.add(t)
  86. missing_tags = want_tags.difference(have_tags)
  87. wants = missing_commits.union(missing_tags)
  88. self.objects_to_send = set([(w, None, False) for w in wants])
  89. if progress is None:
  90. self.progress = lambda x: None
  91. else:
  92. self.progress = progress
  93. self._tagged = get_tagged and get_tagged() or {}
  94. class GreenThreadsObjectStoreIterator(ObjectStoreIterator):
  95. """ObjectIterator that works on top of an ObjectStore.
  96. Same implementation as object_store.ObjectStoreIterator
  97. except we use gevent to parallelize object retrieval.
  98. """
  99. def __init__(self, store, shas, finder, concurrency=1):
  100. self.finder = finder
  101. self.p = pool.Pool(size=concurrency)
  102. super(GreenThreadsObjectStoreIterator, self).__init__(store, shas)
  103. def retrieve(self, args):
  104. sha, path = args
  105. return self.store[sha], path
  106. def __iter__(self):
  107. for sha, path in self.p.imap_unordered(self.retrieve,
  108. self.itershas()):
  109. yield sha, path
  110. def __len__(self):
  111. if len(self._shas) > 0:
  112. return len(self._shas)
  113. while len(self.finder.objects_to_send):
  114. jobs = []
  115. for _ in range(0, len(self.finder.objects_to_send)):
  116. jobs.append(self.p.spawn(self.finder.next))
  117. gevent.joinall(jobs)
  118. for j in jobs:
  119. if j.value is not None:
  120. self._shas.append(j.value)
  121. return len(self._shas)