# test_paramiko_vendor.py # # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for paramiko_vendor.""" import os import socket import tempfile import threading import time from io import StringIO from typing import Optional from unittest import skipIf from unittest.mock import patch from .. import TestCase try: import paramiko except ImportError: has_paramiko = False else: has_paramiko = True import paramiko.transport from dulwich.contrib.paramiko_vendor import ParamikoSSHVendor class Server(paramiko.ServerInterface): """http://docs.paramiko.org/en/2.4/api/server.html.""" def __init__(self, commands, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.commands = commands def check_channel_exec_request(self, channel, command) -> bool: self.commands.append(command) return True def check_auth_password(self, username, password): if username == USER and password == PASSWORD: return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED def check_auth_publickey(self, username, key): pubkey = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY)) if username == USER and key == pubkey: return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED def check_channel_request(self, kind, chanid): if kind == "session": return paramiko.OPEN_SUCCEEDED return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED def get_allowed_auths(self, username) -> str: return "password,publickey" class SSHServer: """A real SSH server using Paramiko that listens on a TCP port.""" def __init__(self): self.commands = [] self.server_socket = None self.server_thread = None self.host_key = paramiko.RSAKey.from_private_key(StringIO(SERVER_KEY)) self.running = False self.connection_threads = [] def start(self): """Start the SSH server on a random port.""" self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind(("127.0.0.1", 0)) self.port = self.server_socket.getsockname()[1] self.server_socket.listen(5) self.running = True self.server_thread = threading.Thread(target=self._run_server) self.server_thread.daemon = True self.server_thread.start() # Give the server a moment to start up time.sleep(0.1) def stop(self): """Stop the SSH server.""" self.running = False if self.server_socket: try: self.server_socket.close() except OSError: pass # Clean up connection threads for thread in self.connection_threads: if thread.is_alive(): thread.join(timeout=1) if self.server_thread and self.server_thread.is_alive(): self.server_thread.join(timeout=5) def _run_server(self): """Main server loop.""" self.server_socket.settimeout( 1.0 ) # Allow checking self.running periodically while self.running: try: client_socket, addr = self.server_socket.accept() # Handle each connection in a separate thread conn_thread = threading.Thread( target=self._handle_connection, args=(client_socket,) ) conn_thread.daemon = True conn_thread.start() self.connection_threads.append(conn_thread) except socket.timeout: # Normal timeout, continue to check if we should keep running continue except OSError as e: # Socket was closed, exit gracefully if not self.running: break # Otherwise re-raise the error raise e def _handle_connection(self, client_socket): """Handle a single SSH connection.""" transport = None try: transport = paramiko.Transport(client_socket) transport.add_server_key(self.host_key) server = Server(self.commands) transport.start_server(server=server) # Wait for channel requests and handle them while self.running and transport.is_active(): channel = transport.accept(1) if channel is None: continue # Handle channel in a separate thread to allow multiple channels channel_thread = threading.Thread( target=self._handle_channel, args=(channel,) ) channel_thread.daemon = True channel_thread.start() except paramiko.SSHException as e: print(f"SSH error in connection handler: {e}") except OSError as e: print(f"Socket error in connection handler: {e}") finally: if transport: transport.close() if client_socket: client_socket.close() def _handle_channel(self, channel): """Handle a single SSH channel - echo server.""" try: # Set channel to blocking mode channel.setblocking(True) channel.settimeout(10.0) # Read all available data and echo it back while True: try: data = channel.recv(4096) if not data: break # Echo the data back immediately channel.send(data) except socket.timeout: # No more data available, break break except paramiko.SSHException as e: print(f"SSH error in channel handler: {e}") except OSError as e: print(f"Socket error in channel handler: {e}") finally: try: channel.close() except OSError: pass USER = "testuser" PASSWORD = "test" SERVER_KEY = """\ -----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEAy/L1sSYAzxsMprtNXW4u/1jGXXkQmQ2xtmKVlR+RlIL3a1BH bzTpPlZyjltAAwzIP8XRh0iJFKz5y3zSQChhX47ZGN0NvQsVct8R+YwsUonwfAJ+ JN0KBKKvC8fPHlzqBr3gX+ZxqsFH934tQ6wdQPH5eQWtdM8L826lMsH1737uyTGk +mCSDjL3c6EzY83g7qhkJU2R4qbi6ne01FaWADzG8sOzXnHT+xpxtk8TTT8yCVUY MmBNsSoA/ka3iWz70ghB+6Xb0WpFJZXWq1oYovviPAfZGZSrxBZMxsWMye70SdLl TqsBEt0+miIcm9s0fvjWvQuhaHX6mZs5VO4r5QIDAQABAoIBAGYqeYWaYgFdrYLA hUrubUCg+g3NHdFuGL4iuIgRXl4lFUh+2KoOuWDu8Uf60iA1AQNhV0sLvQ/Mbv3O s4xMLisuZfaclctDiCUZNenqnDFkxEF7BjH1QJV94W5nU4wEQ3/JEmM4D2zYkfKb FJW33JeyH6TOgUvohDYYEU1R+J9V8qA243p+ui1uVtNI6Pb0TXJnG5y9Ny4vkSWH Fi0QoMPR1r9xJ4SEearGzA/crb4SmmDTKhGSoMsT3d5ATieLmwcS66xWz8w4oFGJ yzDq24s4Fp9ccNjMf/xR8XRiekJv835gjEqwF9IXyvgOaq6XJ1iCqGPFDKa25nui JnEstOkCgYEA/ZXk7aIanvdeJlTqpX578sJfCnrXLydzE8emk1b7+5mrzGxQ4/pM PBQs2f8glT3t0O0mRX9NoRqnwrid88/b+cY4NCOICFZeasX336/gYQxyVeRLJS6Z hnGEQqry8qS7PdKAyeHMNmZFrUh4EiHiObymEfQS+mkRUObn0cGBTw8CgYEAzeQU D2baec1DawjppKaRynAvWjp+9ry1lZx9unryKVRwjRjkEpw+b3/+hdaF1IvsVSce cNj+6W2guZ2tyHuPhZ64/4SJVyE2hKDSKD4xTb2nVjsMeN0bLD2UWXC9mwbx8nWa 2tmtUZ7a/okQb2cSdosJinRewLNqXIsBXamT1csCgYEA0cXb2RCOQQ6U3dTFPx4A 3vMXuA2iUKmrsqMoEx6T2LBow/Sefdkik1iFOdipVYwjXP+w9zC2QR1Rxez/DR/X 8ymceNUjxPHdrSoTQQG29dFcC92MpDeGXQcuyA+uZjcLhbrLOzYEvsOfxBb87NMG 14hNQPDNekTMREafYo9WrtUCgYAREK54+FVzcwf7fymedA/xb4r9N4v+d3W1iNsC 8d3Qfyc1CrMct8aVB07ZWQaOr2pPRIbJY7L9NhD0UZVt4I/sy1MaGqonhqE2LP4+ R6legDG2e/50ph7yc8gwAaA1kUXMiuLi8Nfkw/3yyvmJwklNegi4aRzRbA2Mzhi2 4q9WMQKBgQCb0JNyxHG4pvLWCF/j0Sm1FfvrpnqSv5678n1j4GX7Ka/TubOK1Y4K U+Oib7dKa/zQMWehVFNTayrsq6bKVZ6q7zG+IHiRLw4wjeAxREFH6WUjDrn9vl2l D48DKbBuBwuVOJWyq3qbfgJXojscgNQklrsPdXVhDwOF0dYxP89HnA== -----END RSA PRIVATE KEY-----""" CLIENT_KEY = """\ -----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEAxvREKSElPOm/0z/nPO+j5rk2tjdgGcGc7We1QZ6TRXYLu7nN GeEFIL4p8N1i6dmB+Eydt7xqCU79MWD6Yy4prFe1+/K1wCDUxIbFMxqQcX5zjJzd i8j8PbcaUlVhP/OkjtkSxrXaGDO1BzfdV4iEBtTV/2l3zmLKJlt3jnOHLczP24CB DTQKp3rKshbRefzot9Y+wnaK692RsYgsyo9YEP0GyWKG9topCHk13r46J6vGLeuj ryUKqmbLJkzbJbIcEqwTDo5iHaCVqaMr5Hrb8BdMucSseqZQJsXSd+9tdRcIblUQ 38kZjmFMm4SFbruJcpZCNM2wNSZPIRX+3eiwNwIDAQABAoIBAHSacOBSJsr+jIi5 KUOTh9IPtzswVUiDKwARCjB9Sf8p4lKR4N1L/n9kNJyQhApeikgGT2GCMftmqgoo tlculQoHFgemBlOmak0MV8NNzF5YKEy/GzF0CDH7gJfEpoyetVFrdA+2QS5yD6U9 XqKQxiBi2VEqdScmyyeT8AwzNYTnPeH/DOEcnbdRjqiy/CD79F49CQ1lX1Fuqm0K I7BivBH1xo/rVnUP4F+IzocDqoga+Pjdj0LTXIgJlHQDSbhsQqWujWQDDuKb+MAw sNK4Zf8ErV3j1PyA7f/M5LLq6zgstkW4qikDHo4SpZX8kFOO8tjqb7kujj7XqeaB CxqrOTECgYEA73uWkrohcmDJ4KqbuL3tbExSCOUiaIV+sT1eGPNi7GCmXD4eW5Z4 75v2IHymW83lORSu/DrQ6sKr1nkuRpqr2iBzRmQpl/H+wahIhBXlnJ25uUjDsuPO 1Pq2LcmyD+jTxVnmbSe/q7O09gZQw3I6H4+BMHmpbf8tC97lqimzpJ0CgYEA1K0W ZL70Xtn9quyHvbtae/BW07NZnxvUg4UaVIAL9Zu34JyplJzyzbIjrmlDbv6aRogH /KtuG9tfbf55K/jjqNORiuRtzt1hUN1ye4dyW7tHx2/7lXdlqtyK40rQl8P0kqf8 zaS6BqjnobgSdSpg32rWoL/pcBHPdJCJEgQ8zeMCgYEA0/PK8TOhNIzrP1dgGSKn hkkJ9etuB5nW5mEM7gJDFDf6JPupfJ/xiwe6z0fjKK9S57EhqgUYMB55XYnE5iIw ZQ6BV9SAZ4V7VsRs4dJLdNC3tn/rDGHJBgCaym2PlbsX6rvFT+h1IC8dwv0V79Ui Ehq9WTzkMoE8yhvNokvkPZUCgYEAgBAFxv5xGdh79ftdtXLmhnDvZ6S8l6Fjcxqo Ay/jg66Tp43OU226iv/0mmZKM8Dd1xC8dnon4GBVc19jSYYiWBulrRPlx0Xo/o+K CzZBN1lrXH1i6dqufpc0jq8TMf/N+q1q/c1uMupsKCY1/xVYpc+ok71b7J7c49zQ nOeuUW8CgYA9Infooy65FTgbzca0c9kbCUBmcAPQ2ItH3JcPKWPQTDuV62HcT00o fZdIV47Nez1W5Clk191RMy8TXuqI54kocciUWpThc6j44hz49oUueb8U4bLcEHzA WxtWBWHwxfSmqgTXilEA3ALJp0kNolLnEttnhENwJpZHlqtes0ZA4w== -----END RSA PRIVATE KEY-----""" @skipIf(not has_paramiko, "paramiko is not installed") class ParamikoSSHVendorTests(TestCase): def setUp(self) -> None: self.commands = [] socket.setdefaulttimeout(10) self.addCleanup(socket.setdefaulttimeout, None) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.bind(("127.0.0.1", 0)) self.socket.listen(5) self.addCleanup(self.socket.close) self.port = self.socket.getsockname()[1] self.thread = threading.Thread(target=self._run) self.thread.start() def tearDown(self) -> None: self.thread.join() def _run(self) -> Optional[bool]: try: conn, addr = self.socket.accept() except OSError: return False self.transport = paramiko.Transport(conn) self.addCleanup(self.transport.close) host_key = paramiko.RSAKey.from_private_key(StringIO(SERVER_KEY)) self.transport.add_server_key(host_key) server = Server(self.commands) self.transport.start_server(server=server) def test_run_command_password(self) -> None: vendor = ParamikoSSHVendor( allow_agent=False, look_for_keys=False, ) vendor.run_command( "127.0.0.1", "test_run_command_password", username=USER, port=self.port, password=PASSWORD, ) self.assertIn(b"test_run_command_password", self.commands) def test_run_command_with_privkey(self) -> None: key = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY)) vendor = ParamikoSSHVendor( allow_agent=False, look_for_keys=False, ) vendor.run_command( "127.0.0.1", "test_run_command_with_privkey", username=USER, port=self.port, pkey=key, ) self.assertIn(b"test_run_command_with_privkey", self.commands) def test_run_command_data_transfer(self) -> None: vendor = ParamikoSSHVendor( allow_agent=False, look_for_keys=False, ) con = vendor.run_command( "127.0.0.1", "test_run_command_data_transfer", username=USER, port=self.port, password=PASSWORD, ) self.assertIn(b"test_run_command_data_transfer", self.commands) channel = self.transport.accept(5) channel.send(b"stdout\n") channel.send_stderr(b"stderr\n") channel.close() # Fixme: it's return false # self.assertTrue(con.can_read()) self.assertEqual(b"stdout\n", con.read(4096)) # Fixme: it's return empty string # self.assertEqual(b'stderr\n', con.read_stderr(4096)) def test_ssh_config_parsing(self) -> None: """Test that SSH config is properly parsed and used by ParamikoSSHVendor.""" # Create a temporary SSH config file with tempfile.NamedTemporaryFile(mode="w", suffix=".config", delete=False) as f: f.write( f""" Host testserver HostName 127.0.0.1 User testuser Port {self.port} IdentityFile /path/to/key """ ) config_path = f.name try: # Mock the config path with patch( "dulwich.contrib.paramiko_vendor.os.path.expanduser" ) as mock_expanduser: def side_effect(path): if path == "~/.ssh/config": return config_path return path mock_expanduser.side_effect = side_effect vendor = ParamikoSSHVendor( allow_agent=False, look_for_keys=False, ) # Test that SSH config values are loaded host_config = vendor.ssh_config.lookup("testserver") self.assertEqual(host_config["hostname"], "127.0.0.1") self.assertEqual(host_config["user"], "testuser") self.assertEqual(host_config["port"], str(self.port)) self.assertIn("/path/to/key", host_config["identityfile"]) finally: os.unlink(config_path) @skipIf(not has_paramiko, "paramiko is not installed") class ParamikoSSHVendorRealServerTests(TestCase): """Tests for ParamikoSSHVendor using a real SSH server listening on TCP.""" def setUp(self) -> None: self.ssh_server = SSHServer() self.ssh_server.start() socket.setdefaulttimeout(10) self.addCleanup(socket.setdefaulttimeout, None) self.addCleanup(self.ssh_server.stop) def _run_command(self, command, **kwargs): """Helper to run a command with default vendor settings.""" vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False) kwargs.setdefault("port", self.ssh_server.port) kwargs.setdefault("username", USER) return vendor.run_command("127.0.0.1", command, **kwargs) def _test_echo(self, con, data): """Helper to test echo functionality.""" con.write(data) response = con.read(len(data)) self.assertEqual(data, response) def test_password_authentication_success(self) -> None: """Test successful password authentication.""" con = self._run_command("test_password_auth", password=PASSWORD) self.assertIn(b"test_password_auth", self.ssh_server.commands) self._test_echo(con, b"hello\n") con.close() def test_key_authentication_success(self) -> None: """Test successful key authentication.""" key = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY)) con = self._run_command("test_key_auth", pkey=key) self.assertIn(b"test_key_auth", self.ssh_server.commands) self._test_echo(con, b"key_test\n") con.close() def test_authentication_failures(self) -> None: """Test authentication failures.""" # Wrong password with self.assertRaises(paramiko.AuthenticationException): self._run_command("should_fail", password="wrong_password") # Wrong key wrong_key = paramiko.RSAKey.generate(2048) with self.assertRaises(paramiko.AuthenticationException): self._run_command("should_fail", pkey=wrong_key) def test_connection_errors(self) -> None: """Test various connection errors.""" vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False) # Non-existent port with self.assertRaises((OSError, ConnectionRefusedError)): vendor.run_command( "127.0.0.1", "fail", username=USER, port=65432, password=PASSWORD ) # Invalid hostname with self.assertRaises((socket.gaierror, OSError)): vendor.run_command( "invalid.hostname.example.com", "fail", username=USER, password=PASSWORD ) def test_data_transfer(self) -> None: """Test various data transfer scenarios.""" con = self._run_command("test_data", password=PASSWORD) # Large data (10KB) large_data = b"X" * 10240 self._test_echo(con, large_data) # Binary data with all byte values binary_data = bytes(range(256)) self._test_echo(con, binary_data) con.close() def test_multiple_connections(self) -> None: """Test multiple sequential connections.""" # First connection con1 = self._run_command("test_connection_1", password=PASSWORD) self._test_echo(con1, b"first\n") con1.close() # Second connection con2 = self._run_command("test_connection_2", password=PASSWORD) self._test_echo(con2, b"second\n") con2.close() # Verify both commands were recorded self.assertIn(b"test_connection_1", self.ssh_server.commands) self.assertIn(b"test_connection_2", self.ssh_server.commands) def test_key_from_file(self) -> None: """Test authentication using key file.""" with tempfile.NamedTemporaryFile(mode="w", suffix=".key", delete=False) as f: f.write(CLIENT_KEY) key_path = f.name try: con = self._run_command("test_key_from_file", key_filename=key_path) self.assertIn(b"test_key_from_file", self.ssh_server.commands) self._test_echo(con, b"file_key_test\n") con.close() finally: os.unlink(key_path) def test_protocol_versions(self) -> None: """Test protocol version handling.""" # Protocol version 2 (default) con = self._run_command( "test_protocol_v2", password=PASSWORD, protocol_version=2 ) self.assertIn(b"test_protocol_v2", self.ssh_server.commands) con.close() # Protocol version 1 con = self._run_command( "test_protocol_v1", password=PASSWORD, protocol_version=1 ) self.assertIn(b"test_protocol_v1", self.ssh_server.commands) con.close() def test_vendor_options(self) -> None: """Test vendor initialization options.""" # Test with timeout vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False, timeout=1) con = vendor.run_command( "127.0.0.1", "test_timeout", username=USER, port=self.ssh_server.port, password=PASSWORD, ) self.assertIn(b"test_timeout", self.ssh_server.commands) con.close() def test_can_read(self) -> None: """Test can_read functionality.""" con = self._run_command("test_can_read", password=PASSWORD) # Check can_read returns bool self.assertIsInstance(con.can_read(), bool) # Send data and verify echo self._test_echo(con, b"test_data\n") con.close() def test_partial_reads(self) -> None: """Test reading data in small chunks.""" con = self._run_command("test_partial", password=PASSWORD) test_data = b"0123456789" * 10 # 100 bytes con.write(test_data) # Read in 10-byte chunks received_data = b"" while len(received_data) < len(test_data): chunk = con.read(10) if not chunk: break received_data += chunk self.assertEqual(test_data, received_data) con.close() def test_ssh_config_integration(self) -> None: """Test SSH config integration.""" with tempfile.NamedTemporaryFile(mode="w", suffix=".config", delete=False) as f: f.write(f""" Host testserver HostName 127.0.0.1 User {USER} Port {self.ssh_server.port} """) config_path = f.name try: with patch( "dulwich.contrib.paramiko_vendor.os.path.expanduser" ) as mock_expanduser: mock_expanduser.side_effect = ( lambda p: config_path if p == "~/.ssh/config" else p ) vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False) con = vendor.run_command( "testserver", "test_ssh_config", password=PASSWORD ) self.assertIn(b"test_ssh_config", self.ssh_server.commands) self._test_echo(con, b"config_test\n") con.close() finally: os.unlink(config_path)