# test_paramiko_vendor.py
#
# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as published by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
"""Tests for paramiko_vendor."""
import os
import socket
import tempfile
import threading
import time
from io import StringIO
from unittest import skipIf
from unittest.mock import patch
from .. import TestCase
try:
import paramiko
except ImportError:
has_paramiko = False
else:
has_paramiko = True
import paramiko.transport
from dulwich.contrib.paramiko_vendor import ParamikoSSHVendor
class Server(paramiko.ServerInterface):
"""http://docs.paramiko.org/en/2.4/api/server.html."""
def __init__(self, commands, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.commands = commands
def check_channel_exec_request(self, channel, command) -> bool:
self.commands.append(command)
return True
def check_auth_password(self, username, password):
if username == USER and password == PASSWORD:
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_auth_publickey(self, username, key):
pubkey = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
if username == USER and key == pubkey:
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_channel_request(self, kind, chanid):
if kind == "session":
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def get_allowed_auths(self, username) -> str:
return "password,publickey"
class SSHServer:
"""A real SSH server using Paramiko that listens on a TCP port."""
def __init__(self):
self.commands = []
self.server_socket = None
self.server_thread = None
self.host_key = paramiko.RSAKey.from_private_key(StringIO(SERVER_KEY))
self.running = False
self.connection_threads = []
def start(self):
"""Start the SSH server on a random port."""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(("127.0.0.1", 0))
self.port = self.server_socket.getsockname()[1]
self.server_socket.listen(5)
self.running = True
self.server_thread = threading.Thread(target=self._run_server)
self.server_thread.daemon = True
self.server_thread.start()
# Give the server a moment to start up
time.sleep(0.1)
def stop(self):
"""Stop the SSH server."""
self.running = False
if self.server_socket:
try:
self.server_socket.close()
except OSError:
pass
# Clean up connection threads
for thread in self.connection_threads:
if thread.is_alive():
thread.join(timeout=1)
if self.server_thread and self.server_thread.is_alive():
self.server_thread.join(timeout=5)
def _run_server(self):
"""Main server loop."""
self.server_socket.settimeout(
1.0
) # Allow checking self.running periodically
while self.running:
try:
client_socket, _addr = self.server_socket.accept()
# Handle each connection in a separate thread
conn_thread = threading.Thread(
target=self._handle_connection, args=(client_socket,)
)
conn_thread.daemon = True
conn_thread.start()
self.connection_threads.append(conn_thread)
except TimeoutError:
# Normal timeout, continue to check if we should keep running
continue
except OSError as e:
# Socket was closed, exit gracefully
if not self.running:
break
# Otherwise re-raise the error
raise e
def _handle_connection(self, client_socket):
"""Handle a single SSH connection."""
transport = None
try:
transport = paramiko.Transport(client_socket)
transport.add_server_key(self.host_key)
server = Server(self.commands)
transport.start_server(server=server)
# Wait for channel requests and handle them
while self.running and transport.is_active():
channel = transport.accept(1)
if channel is None:
continue
# Handle channel in a separate thread to allow multiple channels
channel_thread = threading.Thread(
target=self._handle_channel, args=(channel,)
)
channel_thread.daemon = True
channel_thread.start()
except paramiko.SSHException as e:
print(f"SSH error in connection handler: {e}")
except OSError as e:
print(f"Socket error in connection handler: {e}")
finally:
if transport:
transport.close()
if client_socket:
client_socket.close()
def _handle_channel(self, channel):
"""Handle a single SSH channel - echo server."""
try:
# Set channel to blocking mode
channel.setblocking(True)
channel.settimeout(10.0)
# Read all available data and echo it back
while True:
try:
data = channel.recv(4096)
if not data:
break
# Echo the data back immediately
channel.send(data)
except TimeoutError:
# No more data available, break
break
except paramiko.SSHException as e:
print(f"SSH error in channel handler: {e}")
except OSError as e:
print(f"Socket error in channel handler: {e}")
finally:
try:
channel.close()
except OSError:
pass
USER = "testuser"
PASSWORD = "test"
SERVER_KEY = """\
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAy/L1sSYAzxsMprtNXW4u/1jGXXkQmQ2xtmKVlR+RlIL3a1BH
bzTpPlZyjltAAwzIP8XRh0iJFKz5y3zSQChhX47ZGN0NvQsVct8R+YwsUonwfAJ+
JN0KBKKvC8fPHlzqBr3gX+ZxqsFH934tQ6wdQPH5eQWtdM8L826lMsH1737uyTGk
+mCSDjL3c6EzY83g7qhkJU2R4qbi6ne01FaWADzG8sOzXnHT+xpxtk8TTT8yCVUY
MmBNsSoA/ka3iWz70ghB+6Xb0WpFJZXWq1oYovviPAfZGZSrxBZMxsWMye70SdLl
TqsBEt0+miIcm9s0fvjWvQuhaHX6mZs5VO4r5QIDAQABAoIBAGYqeYWaYgFdrYLA
hUrubUCg+g3NHdFuGL4iuIgRXl4lFUh+2KoOuWDu8Uf60iA1AQNhV0sLvQ/Mbv3O
s4xMLisuZfaclctDiCUZNenqnDFkxEF7BjH1QJV94W5nU4wEQ3/JEmM4D2zYkfKb
FJW33JeyH6TOgUvohDYYEU1R+J9V8qA243p+ui1uVtNI6Pb0TXJnG5y9Ny4vkSWH
Fi0QoMPR1r9xJ4SEearGzA/crb4SmmDTKhGSoMsT3d5ATieLmwcS66xWz8w4oFGJ
yzDq24s4Fp9ccNjMf/xR8XRiekJv835gjEqwF9IXyvgOaq6XJ1iCqGPFDKa25nui
JnEstOkCgYEA/ZXk7aIanvdeJlTqpX578sJfCnrXLydzE8emk1b7+5mrzGxQ4/pM
PBQs2f8glT3t0O0mRX9NoRqnwrid88/b+cY4NCOICFZeasX336/gYQxyVeRLJS6Z
hnGEQqry8qS7PdKAyeHMNmZFrUh4EiHiObymEfQS+mkRUObn0cGBTw8CgYEAzeQU
D2baec1DawjppKaRynAvWjp+9ry1lZx9unryKVRwjRjkEpw+b3/+hdaF1IvsVSce
cNj+6W2guZ2tyHuPhZ64/4SJVyE2hKDSKD4xTb2nVjsMeN0bLD2UWXC9mwbx8nWa
2tmtUZ7a/okQb2cSdosJinRewLNqXIsBXamT1csCgYEA0cXb2RCOQQ6U3dTFPx4A
3vMXuA2iUKmrsqMoEx6T2LBow/Sefdkik1iFOdipVYwjXP+w9zC2QR1Rxez/DR/X
8ymceNUjxPHdrSoTQQG29dFcC92MpDeGXQcuyA+uZjcLhbrLOzYEvsOfxBb87NMG
14hNQPDNekTMREafYo9WrtUCgYAREK54+FVzcwf7fymedA/xb4r9N4v+d3W1iNsC
8d3Qfyc1CrMct8aVB07ZWQaOr2pPRIbJY7L9NhD0UZVt4I/sy1MaGqonhqE2LP4+
R6legDG2e/50ph7yc8gwAaA1kUXMiuLi8Nfkw/3yyvmJwklNegi4aRzRbA2Mzhi2
4q9WMQKBgQCb0JNyxHG4pvLWCF/j0Sm1FfvrpnqSv5678n1j4GX7Ka/TubOK1Y4K
U+Oib7dKa/zQMWehVFNTayrsq6bKVZ6q7zG+IHiRLw4wjeAxREFH6WUjDrn9vl2l
D48DKbBuBwuVOJWyq3qbfgJXojscgNQklrsPdXVhDwOF0dYxP89HnA==
-----END RSA PRIVATE KEY-----"""
CLIENT_KEY = """\
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAxvREKSElPOm/0z/nPO+j5rk2tjdgGcGc7We1QZ6TRXYLu7nN
GeEFIL4p8N1i6dmB+Eydt7xqCU79MWD6Yy4prFe1+/K1wCDUxIbFMxqQcX5zjJzd
i8j8PbcaUlVhP/OkjtkSxrXaGDO1BzfdV4iEBtTV/2l3zmLKJlt3jnOHLczP24CB
DTQKp3rKshbRefzot9Y+wnaK692RsYgsyo9YEP0GyWKG9topCHk13r46J6vGLeuj
ryUKqmbLJkzbJbIcEqwTDo5iHaCVqaMr5Hrb8BdMucSseqZQJsXSd+9tdRcIblUQ
38kZjmFMm4SFbruJcpZCNM2wNSZPIRX+3eiwNwIDAQABAoIBAHSacOBSJsr+jIi5
KUOTh9IPtzswVUiDKwARCjB9Sf8p4lKR4N1L/n9kNJyQhApeikgGT2GCMftmqgoo
tlculQoHFgemBlOmak0MV8NNzF5YKEy/GzF0CDH7gJfEpoyetVFrdA+2QS5yD6U9
XqKQxiBi2VEqdScmyyeT8AwzNYTnPeH/DOEcnbdRjqiy/CD79F49CQ1lX1Fuqm0K
I7BivBH1xo/rVnUP4F+IzocDqoga+Pjdj0LTXIgJlHQDSbhsQqWujWQDDuKb+MAw
sNK4Zf8ErV3j1PyA7f/M5LLq6zgstkW4qikDHo4SpZX8kFOO8tjqb7kujj7XqeaB
CxqrOTECgYEA73uWkrohcmDJ4KqbuL3tbExSCOUiaIV+sT1eGPNi7GCmXD4eW5Z4
75v2IHymW83lORSu/DrQ6sKr1nkuRpqr2iBzRmQpl/H+wahIhBXlnJ25uUjDsuPO
1Pq2LcmyD+jTxVnmbSe/q7O09gZQw3I6H4+BMHmpbf8tC97lqimzpJ0CgYEA1K0W
ZL70Xtn9quyHvbtae/BW07NZnxvUg4UaVIAL9Zu34JyplJzyzbIjrmlDbv6aRogH
/KtuG9tfbf55K/jjqNORiuRtzt1hUN1ye4dyW7tHx2/7lXdlqtyK40rQl8P0kqf8
zaS6BqjnobgSdSpg32rWoL/pcBHPdJCJEgQ8zeMCgYEA0/PK8TOhNIzrP1dgGSKn
hkkJ9etuB5nW5mEM7gJDFDf6JPupfJ/xiwe6z0fjKK9S57EhqgUYMB55XYnE5iIw
ZQ6BV9SAZ4V7VsRs4dJLdNC3tn/rDGHJBgCaym2PlbsX6rvFT+h1IC8dwv0V79Ui
Ehq9WTzkMoE8yhvNokvkPZUCgYEAgBAFxv5xGdh79ftdtXLmhnDvZ6S8l6Fjcxqo
Ay/jg66Tp43OU226iv/0mmZKM8Dd1xC8dnon4GBVc19jSYYiWBulrRPlx0Xo/o+K
CzZBN1lrXH1i6dqufpc0jq8TMf/N+q1q/c1uMupsKCY1/xVYpc+ok71b7J7c49zQ
nOeuUW8CgYA9Infooy65FTgbzca0c9kbCUBmcAPQ2ItH3JcPKWPQTDuV62HcT00o
fZdIV47Nez1W5Clk191RMy8TXuqI54kocciUWpThc6j44hz49oUueb8U4bLcEHzA
WxtWBWHwxfSmqgTXilEA3ALJp0kNolLnEttnhENwJpZHlqtes0ZA4w==
-----END RSA PRIVATE KEY-----"""
@skipIf(not has_paramiko, "paramiko is not installed")
class ParamikoSSHVendorTests(TestCase):
def setUp(self) -> None:
self.commands = []
socket.setdefaulttimeout(10)
self.addCleanup(socket.setdefaulttimeout, None)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(("127.0.0.1", 0))
self.socket.listen(5)
self.addCleanup(self.socket.close)
self.port = self.socket.getsockname()[1]
self.thread = threading.Thread(target=self._run)
self.thread.start()
def tearDown(self) -> None:
self.thread.join()
def _run(self) -> bool | None:
try:
conn, _addr = self.socket.accept()
except OSError:
return False
self.transport = paramiko.Transport(conn)
self.addCleanup(self.transport.close)
host_key = paramiko.RSAKey.from_private_key(StringIO(SERVER_KEY))
self.transport.add_server_key(host_key)
server = Server(self.commands)
self.transport.start_server(server=server)
def test_run_command_password(self) -> None:
vendor = ParamikoSSHVendor(
allow_agent=False,
look_for_keys=False,
)
vendor.run_command(
"127.0.0.1",
"test_run_command_password",
username=USER,
port=self.port,
password=PASSWORD,
)
self.assertIn(b"test_run_command_password", self.commands)
def test_run_command_with_privkey(self) -> None:
key = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
vendor = ParamikoSSHVendor(
allow_agent=False,
look_for_keys=False,
)
vendor.run_command(
"127.0.0.1",
"test_run_command_with_privkey",
username=USER,
port=self.port,
pkey=key,
)
self.assertIn(b"test_run_command_with_privkey", self.commands)
def test_run_command_data_transfer(self) -> None:
vendor = ParamikoSSHVendor(
allow_agent=False,
look_for_keys=False,
)
con = vendor.run_command(
"127.0.0.1",
"test_run_command_data_transfer",
username=USER,
port=self.port,
password=PASSWORD,
)
self.assertIn(b"test_run_command_data_transfer", self.commands)
channel = self.transport.accept(5)
channel.send(b"stdout\n")
channel.send_stderr(b"stderr\n")
channel.close()
# Fixme: it's return false
# self.assertTrue(con.can_read())
self.assertEqual(b"stdout\n", con.read(4096))
# Fixme: it's return empty string
# self.assertEqual(b'stderr\n', con.read_stderr(4096))
def test_ssh_config_parsing(self) -> None:
"""Test that SSH config is properly parsed and used by ParamikoSSHVendor."""
# Create a temporary SSH config file
with tempfile.NamedTemporaryFile(mode="w", suffix=".config", delete=False) as f:
f.write(
f"""
Host testserver
HostName 127.0.0.1
User testuser
Port {self.port}
IdentityFile /path/to/key
"""
)
config_path = f.name
try:
# Mock the config path
with patch(
"dulwich.contrib.paramiko_vendor.os.path.expanduser"
) as mock_expanduser:
def side_effect(path):
if path == "~/.ssh/config":
return config_path
return path
mock_expanduser.side_effect = side_effect
vendor = ParamikoSSHVendor(
allow_agent=False,
look_for_keys=False,
)
# Test that SSH config values are loaded
host_config = vendor.ssh_config.lookup("testserver")
self.assertEqual(host_config["hostname"], "127.0.0.1")
self.assertEqual(host_config["user"], "testuser")
self.assertEqual(host_config["port"], str(self.port))
self.assertIn("/path/to/key", host_config["identityfile"])
finally:
os.unlink(config_path)
@skipIf(not has_paramiko, "paramiko is not installed")
class ParamikoSSHVendorRealServerTests(TestCase):
"""Tests for ParamikoSSHVendor using a real SSH server listening on TCP."""
def setUp(self) -> None:
self.ssh_server = SSHServer()
self.ssh_server.start()
socket.setdefaulttimeout(10)
self.addCleanup(socket.setdefaulttimeout, None)
self.addCleanup(self.ssh_server.stop)
def _run_command(self, command, **kwargs):
"""Helper to run a command with default vendor settings."""
vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False)
kwargs.setdefault("port", self.ssh_server.port)
kwargs.setdefault("username", USER)
return vendor.run_command("127.0.0.1", command, **kwargs)
def _test_echo(self, con, data):
"""Helper to test echo functionality."""
con.write(data)
response = con.read(len(data))
self.assertEqual(data, response)
def test_password_authentication_success(self) -> None:
"""Test successful password authentication."""
con = self._run_command("test_password_auth", password=PASSWORD)
self.assertIn(b"test_password_auth", self.ssh_server.commands)
self._test_echo(con, b"hello\n")
con.close()
def test_key_authentication_success(self) -> None:
"""Test successful key authentication."""
key = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
con = self._run_command("test_key_auth", pkey=key)
self.assertIn(b"test_key_auth", self.ssh_server.commands)
self._test_echo(con, b"key_test\n")
con.close()
def test_authentication_failures(self) -> None:
"""Test authentication failures."""
# Wrong password
with self.assertRaises(paramiko.AuthenticationException):
self._run_command("should_fail", password="wrong_password")
# Wrong key
wrong_key = paramiko.RSAKey.generate(2048)
with self.assertRaises(paramiko.AuthenticationException):
self._run_command("should_fail", pkey=wrong_key)
def test_connection_errors(self) -> None:
"""Test various connection errors."""
vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False)
# Non-existent port
with self.assertRaises((OSError, ConnectionRefusedError)):
vendor.run_command(
"127.0.0.1", "fail", username=USER, port=65432, password=PASSWORD
)
# Invalid hostname
with self.assertRaises((socket.gaierror, OSError)):
vendor.run_command(
"invalid.hostname.example.com", "fail", username=USER, password=PASSWORD
)
def test_data_transfer(self) -> None:
"""Test various data transfer scenarios."""
con = self._run_command("test_data", password=PASSWORD)
# Large data (10KB)
large_data = b"X" * 10240
self._test_echo(con, large_data)
# Binary data with all byte values
binary_data = bytes(range(256))
self._test_echo(con, binary_data)
con.close()
def test_multiple_connections(self) -> None:
"""Test multiple sequential connections."""
# First connection
con1 = self._run_command("test_connection_1", password=PASSWORD)
self._test_echo(con1, b"first\n")
con1.close()
# Second connection
con2 = self._run_command("test_connection_2", password=PASSWORD)
self._test_echo(con2, b"second\n")
con2.close()
# Verify both commands were recorded
self.assertIn(b"test_connection_1", self.ssh_server.commands)
self.assertIn(b"test_connection_2", self.ssh_server.commands)
def test_key_from_file(self) -> None:
"""Test authentication using key file."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".key", delete=False) as f:
f.write(CLIENT_KEY)
key_path = f.name
try:
con = self._run_command("test_key_from_file", key_filename=key_path)
self.assertIn(b"test_key_from_file", self.ssh_server.commands)
self._test_echo(con, b"file_key_test\n")
con.close()
finally:
os.unlink(key_path)
def test_protocol_versions(self) -> None:
"""Test protocol version handling."""
# Protocol version 2 (default)
con = self._run_command(
"test_protocol_v2", password=PASSWORD, protocol_version=2
)
self.assertIn(b"test_protocol_v2", self.ssh_server.commands)
con.close()
# Protocol version 1
con = self._run_command(
"test_protocol_v1", password=PASSWORD, protocol_version=1
)
self.assertIn(b"test_protocol_v1", self.ssh_server.commands)
con.close()
def test_vendor_options(self) -> None:
"""Test vendor initialization options."""
# Test with timeout
vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False, timeout=1)
con = vendor.run_command(
"127.0.0.1",
"test_timeout",
username=USER,
port=self.ssh_server.port,
password=PASSWORD,
)
self.assertIn(b"test_timeout", self.ssh_server.commands)
con.close()
def test_can_read(self) -> None:
"""Test can_read functionality."""
con = self._run_command("test_can_read", password=PASSWORD)
# Check can_read returns bool
self.assertIsInstance(con.can_read(), bool)
# Send data and verify echo
self._test_echo(con, b"test_data\n")
con.close()
def test_partial_reads(self) -> None:
"""Test reading data in small chunks."""
con = self._run_command("test_partial", password=PASSWORD)
test_data = b"0123456789" * 10 # 100 bytes
con.write(test_data)
# Read in 10-byte chunks
received_data = b""
while len(received_data) < len(test_data):
chunk = con.read(10)
if not chunk:
break
received_data += chunk
self.assertEqual(test_data, received_data)
con.close()
def test_ssh_config_integration(self) -> None:
"""Test SSH config integration."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".config", delete=False) as f:
f.write(f"""
Host testserver
HostName 127.0.0.1
User {USER}
Port {self.ssh_server.port}
""")
config_path = f.name
try:
with patch(
"dulwich.contrib.paramiko_vendor.os.path.expanduser"
) as mock_expanduser:
mock_expanduser.side_effect = (
lambda p: config_path if p == "~/.ssh/config" else p
)
vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False)
con = vendor.run_command(
"testserver", "test_ssh_config", password=PASSWORD
)
self.assertIn(b"test_ssh_config", self.ssh_server.commands)
self._test_echo(con, b"config_test\n")
con.close()
finally:
os.unlink(config_path)