# test_dumb.py -- Compatibility tests for dumb HTTP git repositories
# Copyright (C) 2025 Dulwich contributors
#
# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
"""Compatibility tests for dumb HTTP git repositories."""
import os
import sys
import tempfile
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
from unittest import skipUnless
from dulwich.client import HttpGitClient
from dulwich.porcelain import clone
from dulwich.repo import Repo
from tests.compat.utils import (
CompatTestCase,
rmtree_ro,
run_git_or_fail,
)
class DumbHTTPRequestHandler(SimpleHTTPRequestHandler):
"""HTTP request handler for dumb git protocol."""
def __init__(self, *args, directory=None, **kwargs):
self.directory = directory
super().__init__(*args, directory=directory, **kwargs)
def log_message(self, format, *args):
# Suppress logging during tests
pass
class DumbHTTPGitServer:
"""Simple HTTP server for serving git repositories."""
def __init__(self, root_path, port=0):
self.root_path = root_path
def handler(*args, **kwargs):
return DumbHTTPRequestHandler(*args, directory=root_path, **kwargs)
self.server = HTTPServer(("127.0.0.1", port), handler)
self.server.allow_reuse_address = True
self.port = self.server.server_port
self.thread = None
def start(self):
"""Start the HTTP server in a background thread."""
self.thread = threading.Thread(target=self.server.serve_forever)
self.thread.daemon = True
self.thread.start()
# Give the server a moment to start and verify it's listening
import socket
import time
for i in range(50): # Try for up to 5 seconds
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.1)
result = sock.connect_ex(("127.0.0.1", self.port))
sock.close()
if result == 0:
return # Server is ready
except OSError:
pass
time.sleep(0.1)
# If we get here, server failed to start
raise RuntimeError(f"HTTP server failed to start on port {self.port}")
def stop(self):
"""Stop the HTTP server."""
self.server.shutdown()
if self.thread:
self.thread.join()
@property
def url(self):
"""Get the base URL for this server."""
return f"http://127.0.0.1:{self.port}"
class DumbHTTPClientTests(CompatTestCase):
"""Tests for dumb HTTP client against real git repositories."""
def setUp(self):
super().setUp()
# Create a temporary directory for test repos
self.temp_dir = tempfile.mkdtemp()
self.addCleanup(rmtree_ro, self.temp_dir)
# Create origin repository
self.origin_path = os.path.join(self.temp_dir, "origin.git")
os.mkdir(self.origin_path)
run_git_or_fail(["init", "--bare"], cwd=self.origin_path)
# Create a working repository to push from
self.work_path = os.path.join(self.temp_dir, "work")
os.mkdir(self.work_path)
run_git_or_fail(["init"], cwd=self.work_path)
run_git_or_fail(
["config", "user.email", "test@example.com"], cwd=self.work_path
)
run_git_or_fail(["config", "user.name", "Test User"], cwd=self.work_path)
# Create initial commit
test_file = os.path.join(self.work_path, "test.txt")
with open(test_file, "w") as f:
f.write("Hello, world!\n")
run_git_or_fail(["add", "test.txt"], cwd=self.work_path)
run_git_or_fail(["commit", "-m", "Initial commit"], cwd=self.work_path)
# Push to origin
run_git_or_fail(
["remote", "add", "origin", self.origin_path], cwd=self.work_path
)
run_git_or_fail(["push", "origin", "master"], cwd=self.work_path)
# Update server info for dumb HTTP
run_git_or_fail(["update-server-info"], cwd=self.origin_path)
# Start HTTP server
self.server = DumbHTTPGitServer(self.origin_path)
self.server.start()
self.addCleanup(self.server.stop)
@skipUnless(
sys.platform != "win32", "git clone from Python HTTPServer fails on Windows"
)
def test_clone_dumb(self):
dest_path = os.path.join(self.temp_dir, "cloned")
repo = clone(self.server.url, dest_path)
assert b"HEAD" in repo
def test_clone_from_dumb_http(self):
"""Test cloning from a dumb HTTP server."""
dest_path = os.path.join(self.temp_dir, "cloned")
# Use dulwich to clone via dumb HTTP
client = HttpGitClient(self.server.url)
# Create destination repo
dest_repo = Repo.init(dest_path, mkdir=True)
try:
# Fetch from dumb HTTP
def determine_wants(refs):
return [
sha for ref, sha in refs.items() if ref.startswith(b"refs/heads/")
]
result = client.fetch("/", dest_repo, determine_wants=determine_wants)
# Update refs
for ref, sha in result.refs.items():
if ref.startswith(b"refs/heads/"):
dest_repo.refs[ref] = sha
# Checkout files
dest_repo.reset_index()
# Verify the clone
test_file = os.path.join(dest_path, "test.txt")
self.assertTrue(os.path.exists(test_file))
with open(test_file) as f:
self.assertEqual("Hello, world!\n", f.read())
finally:
# Ensure repo is closed before cleanup
dest_repo.close()
@skipUnless(
sys.platform != "win32", "git clone from Python HTTPServer fails on Windows"
)
def test_fetch_new_commit_from_dumb_http(self):
"""Test fetching new commits from a dumb HTTP server."""
# First clone the repository
dest_path = os.path.join(self.temp_dir, "cloned")
run_git_or_fail(["clone", self.server.url, dest_path])
# Make a new commit in the origin
test_file2 = os.path.join(self.work_path, "test2.txt")
with open(test_file2, "w") as f:
f.write("Second file\n")
run_git_or_fail(["add", "test2.txt"], cwd=self.work_path)
run_git_or_fail(["commit", "-m", "Second commit"], cwd=self.work_path)
run_git_or_fail(["push", "origin", "master"], cwd=self.work_path)
# Update server info again
run_git_or_fail(["update-server-info"], cwd=self.origin_path)
# Fetch with dulwich client
client = HttpGitClient(self.server.url)
dest_repo = Repo(dest_path)
try:
old_refs = dest_repo.get_refs()
def determine_wants(refs):
wants = []
for ref, sha in refs.items():
if ref.startswith(b"refs/heads/") and sha != old_refs.get(ref):
wants.append(sha)
return wants
result = client.fetch("/", dest_repo, determine_wants=determine_wants)
# Update refs
for ref, sha in result.refs.items():
if ref.startswith(b"refs/heads/"):
dest_repo.refs[ref] = sha
# Reset to new commit
dest_repo.reset_index()
# Verify the new file exists
test_file2_dest = os.path.join(dest_path, "test2.txt")
self.assertTrue(os.path.exists(test_file2_dest))
with open(test_file2_dest) as f:
self.assertEqual("Second file\n", f.read())
finally:
# Ensure repo is closed before cleanup
dest_repo.close()
@skipUnless(
os.name == "posix", "Skipping on non-POSIX systems due to permission handling"
)
def test_fetch_from_dumb_http_with_tags(self):
"""Test fetching tags from a dumb HTTP server."""
# Create a tag in origin
run_git_or_fail(["tag", "-a", "v1.0", "-m", "Version 1.0"], cwd=self.work_path)
run_git_or_fail(["push", "origin", "v1.0"], cwd=self.work_path)
# Update server info
run_git_or_fail(["update-server-info"], cwd=self.origin_path)
# Clone with dulwich
dest_path = os.path.join(self.temp_dir, "cloned_with_tags")
dest_repo = Repo.init(dest_path, mkdir=True)
try:
client = HttpGitClient(self.server.url)
def determine_wants(refs):
return [
sha
for ref, sha in refs.items()
if ref.startswith((b"refs/heads/", b"refs/tags/"))
]
result = client.fetch("/", dest_repo, determine_wants=determine_wants)
# Update refs
for ref, sha in result.refs.items():
dest_repo.refs[ref] = sha
# Check that the tag exists
self.assertIn(b"refs/tags/v1.0", dest_repo.refs)
# Verify tag points to the right commit
tag_sha = dest_repo.refs[b"refs/tags/v1.0"]
tag_obj = dest_repo[tag_sha]
self.assertEqual(b"tag", tag_obj.type_name)
finally:
# Ensure repo is closed before cleanup
dest_repo.close()