123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533 |
- # test_client.py -- Compatibilty tests for git client.
- # Copyright (C) 2010 Google, Inc.
- #
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as public by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Compatibilty tests between the Dulwich client and the cgit server."""
- from contextlib import closing
- import copy
- from io import BytesIO
- import os
- import select
- import signal
- import subprocess
- import sys
- import tarfile
- import tempfile
- import threading
- try:
- from urlparse import unquote
- except ImportError:
- from urllib.parse import unquote
- try:
- import BaseHTTPServer
- import SimpleHTTPServer
- except ImportError:
- import http.server
- BaseHTTPServer = http.server
- SimpleHTTPServer = http.server
- if sys.platform == 'win32':
- import ctypes
- from dulwich import (
- client,
- errors,
- file,
- index,
- protocol,
- objects,
- repo,
- )
- from dulwich.tests import (
- SkipTest,
- expectedFailure,
- )
- from dulwich.tests.compat.utils import (
- CompatTestCase,
- check_for_daemon,
- import_repo_to_dir,
- rmtree_ro,
- run_git_or_fail,
- _DEFAULT_GIT,
- )
- class DulwichClientTestBase(object):
- """Tests for client/server compatibility."""
- def setUp(self):
- self.gitroot = os.path.dirname(import_repo_to_dir('server_new.export').rstrip(os.sep))
- self.dest = os.path.join(self.gitroot, 'dest')
- file.ensure_dir_exists(self.dest)
- run_git_or_fail(['init', '--quiet', '--bare'], cwd=self.dest)
- def tearDown(self):
- rmtree_ro(self.gitroot)
- def assertDestEqualsSrc(self):
- repo_dir = os.path.join(self.gitroot, 'server_new.export')
- dest_repo_dir = os.path.join(self.gitroot, 'dest')
- with closing(repo.Repo(repo_dir)) as src:
- with closing(repo.Repo(dest_repo_dir)) as dest:
- self.assertReposEqual(src, dest)
- def _client(self):
- raise NotImplementedError()
- def _build_path(self):
- raise NotImplementedError()
- def _do_send_pack(self):
- c = self._client()
- srcpath = os.path.join(self.gitroot, 'server_new.export')
- with closing(repo.Repo(srcpath)) as src:
- sendrefs = dict(src.get_refs())
- del sendrefs[b'HEAD']
- c.send_pack(self._build_path(b'/dest'), lambda _: sendrefs,
- src.object_store.generate_pack_contents)
- def test_send_pack(self):
- self._do_send_pack()
- self.assertDestEqualsSrc()
- def test_send_pack_nothing_to_send(self):
- self._do_send_pack()
- self.assertDestEqualsSrc()
- # nothing to send, but shouldn't raise either.
- self._do_send_pack()
- def test_send_without_report_status(self):
- c = self._client()
- c._send_capabilities.remove(b'report-status')
- srcpath = os.path.join(self.gitroot, 'server_new.export')
- with closing(repo.Repo(srcpath)) as src:
- sendrefs = dict(src.get_refs())
- del sendrefs[b'HEAD']
- c.send_pack(self._build_path(b'/dest'), lambda _: sendrefs,
- src.object_store.generate_pack_contents)
- self.assertDestEqualsSrc()
- def make_dummy_commit(self, dest):
- b = objects.Blob.from_string(b'hi')
- dest.object_store.add_object(b)
- t = index.commit_tree(dest.object_store, [(b'hi', b.id, 0o100644)])
- c = objects.Commit()
- c.author = c.committer = b'Foo Bar <foo@example.com>'
- c.author_time = c.commit_time = 0
- c.author_timezone = c.commit_timezone = 0
- c.message = b'hi'
- c.tree = t
- dest.object_store.add_object(c)
- return c.id
- def disable_ff_and_make_dummy_commit(self):
- # disable non-fast-forward pushes to the server
- dest = repo.Repo(os.path.join(self.gitroot, 'dest'))
- run_git_or_fail(['config', 'receive.denyNonFastForwards', 'true'],
- cwd=dest.path)
- commit_id = self.make_dummy_commit(dest)
- return dest, commit_id
- def compute_send(self, src):
- sendrefs = dict(src.get_refs())
- del sendrefs[b'HEAD']
- return sendrefs, src.object_store.generate_pack_contents
- def test_send_pack_one_error(self):
- dest, dummy_commit = self.disable_ff_and_make_dummy_commit()
- dest.refs[b'refs/heads/master'] = dummy_commit
- repo_dir = os.path.join(self.gitroot, 'server_new.export')
- with closing(repo.Repo(repo_dir)) as src:
- sendrefs, gen_pack = self.compute_send(src)
- c = self._client()
- try:
- c.send_pack(self._build_path(b'/dest'), lambda _: sendrefs, gen_pack)
- except errors.UpdateRefsError as e:
- self.assertEqual('refs/heads/master failed to update',
- e.args[0])
- self.assertEqual({b'refs/heads/branch': b'ok',
- b'refs/heads/master': b'non-fast-forward'},
- e.ref_status)
- def test_send_pack_multiple_errors(self):
- dest, dummy = self.disable_ff_and_make_dummy_commit()
- # set up for two non-ff errors
- branch, master = b'refs/heads/branch', b'refs/heads/master'
- dest.refs[branch] = dest.refs[master] = dummy
- repo_dir = os.path.join(self.gitroot, 'server_new.export')
- with closing(repo.Repo(repo_dir)) as src:
- sendrefs, gen_pack = self.compute_send(src)
- c = self._client()
- try:
- c.send_pack(self._build_path(b'/dest'), lambda _: sendrefs, gen_pack)
- except errors.UpdateRefsError as e:
- self.assertIn(str(e),
- ['{0}, {1} failed to update'.format(
- branch.decode('ascii'), master.decode('ascii')),
- '{1}, {0} failed to update'.format(
- branch.decode('ascii'), master.decode('ascii'))])
- self.assertEqual({branch: b'non-fast-forward',
- master: b'non-fast-forward'},
- e.ref_status)
- def test_archive(self):
- c = self._client()
- f = BytesIO()
- c.archive(self._build_path(b'/server_new.export'), b'HEAD', f.write)
- f.seek(0)
- tf = tarfile.open(fileobj=f)
- self.assertEqual(['baz', 'foo'], tf.getnames())
- def test_fetch_pack(self):
- c = self._client()
- with closing(repo.Repo(os.path.join(self.gitroot, 'dest'))) as dest:
- refs = c.fetch(self._build_path(b'/server_new.export'), dest)
- for r in refs.items():
- dest.refs.set_if_equals(r[0], None, r[1])
- self.assertDestEqualsSrc()
- def test_incremental_fetch_pack(self):
- self.test_fetch_pack()
- dest, dummy = self.disable_ff_and_make_dummy_commit()
- dest.refs[b'refs/heads/master'] = dummy
- c = self._client()
- repo_dir = os.path.join(self.gitroot, 'server_new.export')
- with closing(repo.Repo(repo_dir)) as dest:
- refs = c.fetch(self._build_path(b'/dest'), dest)
- for r in refs.items():
- dest.refs.set_if_equals(r[0], None, r[1])
- self.assertDestEqualsSrc()
- def test_fetch_pack_no_side_band_64k(self):
- c = self._client()
- c._fetch_capabilities.remove(b'side-band-64k')
- with closing(repo.Repo(os.path.join(self.gitroot, 'dest'))) as dest:
- refs = c.fetch(self._build_path(b'/server_new.export'), dest)
- for r in refs.items():
- dest.refs.set_if_equals(r[0], None, r[1])
- self.assertDestEqualsSrc()
- def test_fetch_pack_zero_sha(self):
- # zero sha1s are already present on the client, and should
- # be ignored
- c = self._client()
- with closing(repo.Repo(os.path.join(self.gitroot, 'dest'))) as dest:
- refs = c.fetch(self._build_path(b'/server_new.export'), dest,
- lambda refs: [protocol.ZERO_SHA])
- for r in refs.items():
- dest.refs.set_if_equals(r[0], None, r[1])
- def test_send_remove_branch(self):
- with closing(repo.Repo(os.path.join(self.gitroot, 'dest'))) as dest:
- dummy_commit = self.make_dummy_commit(dest)
- dest.refs[b'refs/heads/master'] = dummy_commit
- dest.refs[b'refs/heads/abranch'] = dummy_commit
- sendrefs = dict(dest.refs)
- sendrefs[b'refs/heads/abranch'] = b"00" * 20
- del sendrefs[b'HEAD']
- gen_pack = lambda have, want: []
- c = self._client()
- self.assertEqual(dest.refs[b"refs/heads/abranch"], dummy_commit)
- c.send_pack(self._build_path(b'/dest'), lambda _: sendrefs, gen_pack)
- self.assertFalse(b"refs/heads/abranch" in dest.refs)
- def test_get_refs(self):
- c = self._client()
- refs = c.get_refs(self._build_path(b'/server_new.export'))
- repo_dir = os.path.join(self.gitroot, 'server_new.export')
- with closing(repo.Repo(repo_dir)) as dest:
- self.assertDictEqual(dest.refs.as_dict(), refs)
- class DulwichTCPClientTest(CompatTestCase, DulwichClientTestBase):
- def setUp(self):
- CompatTestCase.setUp(self)
- DulwichClientTestBase.setUp(self)
- if check_for_daemon(limit=1):
- raise SkipTest('git-daemon was already running on port %s' %
- protocol.TCP_GIT_PORT)
- fd, self.pidfile = tempfile.mkstemp(prefix='dulwich-test-git-client',
- suffix=".pid")
- os.fdopen(fd).close()
- args = [_DEFAULT_GIT, 'daemon', '--verbose', '--export-all',
- '--pid-file=%s' % self.pidfile,
- '--base-path=%s' % self.gitroot,
- '--enable=receive-pack', '--enable=upload-archive',
- '--listen=localhost', '--reuseaddr',
- self.gitroot]
- self.process = subprocess.Popen(
- args, cwd=self.gitroot,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if not check_for_daemon():
- raise SkipTest('git-daemon failed to start')
- def tearDown(self):
- with open(self.pidfile) as f:
- pid = int(f.read().strip())
- if sys.platform == 'win32':
- PROCESS_TERMINATE = 1
- handle = ctypes.windll.kernel32.OpenProcess(
- PROCESS_TERMINATE, False, pid)
- ctypes.windll.kernel32.TerminateProcess(handle, -1)
- ctypes.windll.kernel32.CloseHandle(handle)
- else:
- try:
- os.kill(pid, signal.SIGKILL)
- os.unlink(self.pidfile)
- except (OSError, IOError):
- pass
- self.process.wait()
- self.process.stdout.close()
- self.process.stderr.close()
- DulwichClientTestBase.tearDown(self)
- CompatTestCase.tearDown(self)
- def _client(self):
- return client.TCPGitClient('localhost')
- def _build_path(self, path):
- return path
- if sys.platform == 'win32':
- @expectedFailure
- def test_fetch_pack_no_side_band_64k(self):
- DulwichClientTestBase.test_fetch_pack_no_side_band_64k(self)
- class TestSSHVendor(object):
- @staticmethod
- def run_command(host, command, username=None, port=None):
- cmd, path = command.split(b' ')
- cmd = cmd.split(b'-', 1)
- path = path.replace(b"'", b"")
- p = subprocess.Popen(cmd + [path], bufsize=0, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- return client.SubprocessWrapper(p)
- class DulwichMockSSHClientTest(CompatTestCase, DulwichClientTestBase):
- def setUp(self):
- CompatTestCase.setUp(self)
- DulwichClientTestBase.setUp(self)
- self.real_vendor = client.get_ssh_vendor
- client.get_ssh_vendor = TestSSHVendor
- def tearDown(self):
- DulwichClientTestBase.tearDown(self)
- CompatTestCase.tearDown(self)
- client.get_ssh_vendor = self.real_vendor
- def _client(self):
- return client.SSHGitClient('localhost')
- def _build_path(self, path):
- return self.gitroot.encode(sys.getfilesystemencoding()) + path
- class DulwichSubprocessClientTest(CompatTestCase, DulwichClientTestBase):
- def setUp(self):
- CompatTestCase.setUp(self)
- DulwichClientTestBase.setUp(self)
- def tearDown(self):
- DulwichClientTestBase.tearDown(self)
- CompatTestCase.tearDown(self)
- def _client(self):
- return client.SubprocessGitClient(stderr=subprocess.PIPE)
- def _build_path(self, path):
- return self.gitroot.encode(sys.getfilesystemencoding()) + path
- class GitHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
- """HTTP Request handler that calls out to 'git http-backend'."""
- # Make rfile unbuffered -- we need to read one line and then pass
- # the rest to a subprocess, so we can't use buffered input.
- rbufsize = 0
- def do_POST(self):
- self.run_backend()
- def do_GET(self):
- self.run_backend()
- def send_head(self):
- return self.run_backend()
- def log_request(self, code='-', size='-'):
- # Let's be quiet, the test suite is noisy enough already
- pass
- def run_backend(self):
- """Call out to git http-backend."""
- # Based on CGIHTTPServer.CGIHTTPRequestHandler.run_cgi:
- # Copyright (c) 2001-2010 Python Software Foundation; All Rights Reserved
- # Licensed under the Python Software Foundation License.
- rest = self.path
- # find an explicit query string, if present.
- i = rest.rfind('?')
- if i >= 0:
- rest, query = rest[:i], rest[i+1:]
- else:
- query = ''
- env = copy.deepcopy(os.environ)
- env['SERVER_SOFTWARE'] = self.version_string()
- env['SERVER_NAME'] = self.server.server_name
- env['GATEWAY_INTERFACE'] = 'CGI/1.1'
- env['SERVER_PROTOCOL'] = self.protocol_version
- env['SERVER_PORT'] = str(self.server.server_port)
- env['GIT_PROJECT_ROOT'] = self.server.root_path
- env["GIT_HTTP_EXPORT_ALL"] = "1"
- env['REQUEST_METHOD'] = self.command
- uqrest = unquote(rest)
- env['PATH_INFO'] = uqrest
- env['SCRIPT_NAME'] = "/"
- if query:
- env['QUERY_STRING'] = query
- host = self.address_string()
- if host != self.client_address[0]:
- env['REMOTE_HOST'] = host
- env['REMOTE_ADDR'] = self.client_address[0]
- authorization = self.headers.get("authorization")
- if authorization:
- authorization = authorization.split()
- if len(authorization) == 2:
- import base64, binascii
- env['AUTH_TYPE'] = authorization[0]
- if authorization[0].lower() == "basic":
- try:
- authorization = base64.decodestring(authorization[1])
- except binascii.Error:
- pass
- else:
- authorization = authorization.split(':')
- if len(authorization) == 2:
- env['REMOTE_USER'] = authorization[0]
- # XXX REMOTE_IDENT
- env['CONTENT_TYPE'] = self.headers.get('content-type')
- length = self.headers.get('content-length')
- if length:
- env['CONTENT_LENGTH'] = length
- referer = self.headers.get('referer')
- if referer:
- env['HTTP_REFERER'] = referer
- accept = []
- for line in self.headers.getallmatchingheaders('accept'):
- if line[:1] in "\t\n\r ":
- accept.append(line.strip())
- else:
- accept = accept + line[7:].split(',')
- env['HTTP_ACCEPT'] = ','.join(accept)
- ua = self.headers.get('user-agent')
- if ua:
- env['HTTP_USER_AGENT'] = ua
- co = self.headers.get('cookie')
- if co:
- env['HTTP_COOKIE'] = co
- # XXX Other HTTP_* headers
- # Since we're setting the env in the parent, provide empty
- # values to override previously set values
- for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
- 'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'):
- env.setdefault(k, "")
- self.wfile.write(b"HTTP/1.1 200 Script output follows\r\n")
- self.wfile.write(
- ("Server: %s\r\n" % self.server.server_name).encode('ascii'))
- self.wfile.write(
- ("Date: %s\r\n" % self.date_time_string()).encode('ascii'))
- decoded_query = query.replace('+', ' ')
- try:
- nbytes = int(length)
- except (TypeError, ValueError):
- nbytes = 0
- if self.command.lower() == "post" and nbytes > 0:
- data = self.rfile.read(nbytes)
- else:
- data = None
- # throw away additional data [see bug #427345]
- while select.select([self.rfile._sock], [], [], 0)[0]:
- if not self.rfile._sock.recv(1):
- break
- args = ['http-backend']
- if '=' not in decoded_query:
- args.append(decoded_query)
- stdout = run_git_or_fail(args, input=data, env=env, stderr=subprocess.PIPE)
- self.wfile.write(stdout)
- class HTTPGitServer(BaseHTTPServer.HTTPServer):
- allow_reuse_address = True
- def __init__(self, server_address, root_path):
- BaseHTTPServer.HTTPServer.__init__(self, server_address, GitHTTPRequestHandler)
- self.root_path = root_path
- self.server_name = "localhost"
- def get_url(self):
- return 'http://%s:%s/' % (self.server_name, self.server_port)
- class DulwichHttpClientTest(CompatTestCase, DulwichClientTestBase):
- min_git_version = (1, 7, 0, 2)
- def setUp(self):
- CompatTestCase.setUp(self)
- DulwichClientTestBase.setUp(self)
- self._httpd = HTTPGitServer(("localhost", 0), self.gitroot)
- self.addCleanup(self._httpd.shutdown)
- threading.Thread(target=self._httpd.serve_forever).start()
- run_git_or_fail(['config', 'http.uploadpack', 'true'],
- cwd=self.dest)
- run_git_or_fail(['config', 'http.receivepack', 'true'],
- cwd=self.dest)
- def tearDown(self):
- DulwichClientTestBase.tearDown(self)
- CompatTestCase.tearDown(self)
- self._httpd.shutdown()
- self._httpd.socket.close()
- def _client(self):
- return client.HttpGitClient(self._httpd.get_url())
- def _build_path(self, path):
- if sys.version_info[0] == 3:
- return path.decode('ascii')
- else:
- return path
- def test_archive(self):
- raise SkipTest("exporting archives not supported over http")
|