Sfoglia il codice sorgente

Add dulwich.greenthreads module.

Fabien Boucher 11 anni fa
parent
commit
7445fc9c9e
4 ha cambiato i file con 282 aggiunte e 2 eliminazioni
  1. 4 0
      NEWS
  2. 141 0
      dulwich/greenthreads.py
  3. 3 2
      dulwich/tests/__init__.py
  4. 134 0
      dulwich/tests/test_greenthreads.py

+ 4 - 0
NEWS

@@ -11,6 +11,10 @@
 
   * Add porcelain 'status'. (Ryan Faulkner)
 
+  * Add `dulwich.greenthreads` module which provides support
+    for concurrency of some object store operations.
+    (Fabien Boucher)
+
 0.9.6	2014-04-23
 
  IMPROVEMENTS

+ 141 - 0
dulwich/greenthreads.py

@@ -0,0 +1,141 @@
+# greenthreads.py -- Utility module for querying an ObjectStore with gevent
+# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
+#
+# Author: Fabien Boucher <fabien.boucher@enovance.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; version 2
+# of the License or (at your option) any later version of
+# the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA  02110-1301, USA.
+
+"""Utility module for querying an ObjectStore with gevent."""
+
+import gevent
+from gevent import pool
+
+from dulwich.objects import (
+    Commit,
+    Tag,
+    )
+from dulwich.object_store import (
+    MissingObjectFinder,
+    _collect_filetree_revs,
+    ObjectStoreIterator,
+    )
+
+
+def _split_commits_and_tags(obj_store, lst,
+                            ignore_unknown=False, pool=None):
+    """Split object id list into two list with commit SHA1s and tag SHA1s.
+
+    Same implementation as object_store._split_commits_and_tags
+    except we use gevent to parallelize object retrieval.
+    """
+    commits = set()
+    tags = set()
+
+    def find_commit_type(sha):
+        try:
+            o = obj_store[sha]
+        except KeyError:
+            if not ignore_unknown:
+                raise
+        else:
+            if isinstance(o, Commit):
+                commits.add(sha)
+            elif isinstance(o, Tag):
+                tags.add(sha)
+                commits.add(o.object[1])
+            else:
+                raise KeyError('Not a commit or a tag: %s' % sha)
+    jobs = [pool.spawn(find_commit_type, s) for s in lst]
+    gevent.joinall(jobs)
+    return (commits, tags)
+
+
+class GreenThreadsMissingObjectFinder(MissingObjectFinder):
+    """Find the objects missing from another object store.
+
+    Same implementation as object_store.MissingObjectFinder
+    except we use gevent to parallelize object retrieval.
+    """
+    def __init__(self, object_store, haves, wants,
+                 progress=None, get_tagged=None,
+                 concurrency=1, get_parents=None):
+
+        def collect_tree_sha(sha):
+            self.sha_done.add(sha)
+            cmt = object_store[sha]
+            _collect_filetree_revs(object_store, cmt.tree, self.sha_done)
+
+        self.object_store = object_store
+        p = pool.Pool(size=concurrency)
+
+        have_commits, have_tags = \
+            _split_commits_and_tags(object_store, haves,
+                                    True, p)
+        want_commits, want_tags = \
+            _split_commits_and_tags(object_store, wants,
+                                    False, p)
+        all_ancestors = object_store._collect_ancestors(have_commits)[0]
+        missing_commits, common_commits = \
+            object_store._collect_ancestors(want_commits, all_ancestors)
+
+        self.sha_done = set()
+        jobs = [p.spawn(collect_tree_sha, c) for c in common_commits]
+        gevent.joinall(jobs)
+        for t in have_tags:
+            self.sha_done.add(t)
+        missing_tags = want_tags.difference(have_tags)
+        wants = missing_commits.union(missing_tags)
+        self.objects_to_send = set([(w, None, False) for w in wants])
+        if progress is None:
+            self.progress = lambda x: None
+        else:
+            self.progress = progress
+        self._tagged = get_tagged and get_tagged() or {}
+
+
+class GreenThreadsObjectStoreIterator(ObjectStoreIterator):
+    """ObjectIterator that works on top of an ObjectStore.
+
+    Same implementation as object_store.ObjectStoreIterator
+    except we use gevent to parallelize object retrieval.
+    """
+    def __init__(self, store, shas, finder, concurrency=1):
+        self.finder = finder
+        self.p = pool.Pool(size=concurrency)
+        super(GreenThreadsObjectStoreIterator, self).__init__(store, shas)
+
+    def retrieve(self, args):
+        sha, path = args
+        return self.store[sha], path
+
+    def __iter__(self):
+        for sha, path in self.p.imap_unordered(self.retrieve,
+                                               self.itershas()):
+            yield sha, path
+
+    def __len__(self):
+        if len(self._shas) > 0:
+            return len(self._shas)
+        while len(self.finder.objects_to_send):
+            jobs = []
+            for _ in xrange(0, len(self.finder.objects_to_send)):
+                jobs.append(self.p.spawn(self.finder.next))
+            gevent.joinall(jobs)
+            for j in jobs:
+                if j.value is not None:
+                    self._shas.append(j.value)
+        return len(self._shas)

+ 3 - 2
dulwich/tests/__init__.py

@@ -30,10 +30,10 @@ import tempfile
 if sys.version_info >= (2, 7):
     # If Python itself provides an exception, use that
     import unittest
-    from unittest import SkipTest, TestCase as _TestCase
+    from unittest import SkipTest, skipIf, TestCase as _TestCase
 else:
     import unittest2 as unittest
-    from unittest2 import SkipTest, TestCase as _TestCase
+    from unittest2 import SkipTest, skipIf, TestCase as _TestCase
 
 
 def get_safe_env(env=None):
@@ -118,6 +118,7 @@ def self_test_suite():
         'fastexport',
         'file',
         'grafts',
+        'greenthreads',
         'hooks',
         'index',
         'lru_cache',

+ 134 - 0
dulwich/tests/test_greenthreads.py

@@ -0,0 +1,134 @@
+# test_greenthreads.py -- Unittests for eventlet.
+# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
+#
+# Author: Fabien Boucher <fabien.boucher@enovance.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; version 2
+# of the License or (at your option) any later version of
+# the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA  02110-1301, USA.
+
+import time
+
+from dulwich.tests import (
+    TestCase,
+    skipIf,
+    )
+from dulwich.object_store import (
+    MemoryObjectStore,
+    MissingObjectFinder,
+    )
+from dulwich.objects import (
+    Commit,
+    Blob,
+    Tree,
+    parse_timezone,
+    )
+
+try:
+    import gevent
+    gevent_support = True
+except ImportError:
+    gevent_support = False
+
+if gevent_support:
+    from dulwich.greenthreads import (
+        GreenThreadsObjectStoreIterator,
+        GreenThreadsMissingObjectFinder,
+    )
+
+skipmsg = "Gevent library is not installed"
+
+def create_commit(marker=None):
+    blob = Blob.from_string('The blob content %s' % marker)
+    tree = Tree()
+    tree.add("thefile %s" % marker, 0100644, blob.id)
+    cmt = Commit()
+    cmt.tree = tree.id
+    cmt.author = cmt.committer = "John Doe <john@doe.net>"
+    cmt.message = "%s" % marker
+    tz = parse_timezone('-0200')[0]
+    cmt.commit_time = cmt.author_time = int(time.time())
+    cmt.commit_timezone = cmt.author_timezone = tz
+    return cmt, tree, blob
+
+
+def init_store(store, count=1):
+    ret = []
+    for i in xrange(0, count):
+        objs = create_commit(marker=i)
+        for obj in objs:
+            ret.append(obj)
+            store.add_object(obj)
+    return ret
+
+
+@skipIf(not gevent_support, skipmsg)
+class TestGreenThreadsObjectStoreIterator(TestCase):
+
+    def setUp(self):
+        super(TestGreenThreadsObjectStoreIterator, self).setUp()
+        self.store = MemoryObjectStore()
+        self.cmt_amount = 10
+        self.objs = init_store(self.store, self.cmt_amount)
+
+    def test_len(self):
+        wants = [sha.id for sha in self.objs if isinstance(sha, Commit)]
+        finder = MissingObjectFinder(self.store, (), wants)
+        iterator = GreenThreadsObjectStoreIterator(self.store,
+                                               iter(finder.next, None),
+                                               finder)
+        # One commit refers one tree and one blob
+        self.assertEqual(len(iterator), self.cmt_amount * 3)
+        haves = wants[0:self.cmt_amount-1]
+        finder = MissingObjectFinder(self.store, haves, wants)
+        iterator = GreenThreadsObjectStoreIterator(self.store,
+                                               iter(finder.next, None),
+                                               finder)
+        self.assertEqual(len(iterator), 3)
+
+    def test_iter(self):
+        wants = [sha.id for sha in self.objs if isinstance(sha, Commit)]
+        finder = MissingObjectFinder(self.store, (), wants)
+        iterator = GreenThreadsObjectStoreIterator(self.store,
+                                               iter(finder.next, None),
+                                               finder)
+        objs = []
+        for sha, path in iterator:
+            self.assertIn(sha, self.objs)
+            objs.append(sha)
+        self.assertEqual(len(objs), len(self.objs))
+
+
+@skipIf(not gevent_support, skipmsg)
+class TestGreenThreadsMissingObjectFinder(TestCase):
+
+    def setUp(self):
+        super(TestGreenThreadsMissingObjectFinder, self).setUp()
+        self.store = MemoryObjectStore()
+        self.cmt_amount = 10
+        self.objs = init_store(self.store, self.cmt_amount)
+
+    def test_finder(self):
+        wants = [sha.id for sha in self.objs if isinstance(sha, Commit)]
+        finder = GreenThreadsMissingObjectFinder(self.store, (), wants)
+        self.assertEqual(len(finder.sha_done), 0)
+        self.assertEqual(len(finder.objects_to_send), self.cmt_amount)
+
+        finder = GreenThreadsMissingObjectFinder(self.store,
+                                             wants[0:self.cmt_amount/2],
+                                             wants)
+        # sha_done will contains commit id and sha of blob refered in tree
+        self.assertEqual(len(finder.sha_done), (self.cmt_amount/2)*2)
+        self.assertEqual(len(finder.objects_to_send), self.cmt_amount/2)