|
@@ -31,10 +31,13 @@ the dulwich.client.get_ssh_vendor attribute:
|
|
|
This implementation is experimental and does not have any tests.
|
|
|
"""
|
|
|
|
|
|
+import os
|
|
|
+import warnings
|
|
|
from typing import Any, BinaryIO, Optional, cast
|
|
|
|
|
|
import paramiko
|
|
|
import paramiko.client
|
|
|
+import paramiko.config
|
|
|
|
|
|
|
|
|
class _ParamikoWrapper:
|
|
@@ -78,6 +81,22 @@ class ParamikoSSHVendor:
|
|
|
|
|
|
def __init__(self, **kwargs: object) -> None:
|
|
|
self.kwargs = kwargs
|
|
|
+ self.ssh_config = self._load_ssh_config()
|
|
|
+
|
|
|
+ def _load_ssh_config(self) -> paramiko.config.SSHConfig:
|
|
|
+ """Load SSH configuration from ~/.ssh/config."""
|
|
|
+ ssh_config = paramiko.config.SSHConfig()
|
|
|
+ config_path = os.path.expanduser("~/.ssh/config")
|
|
|
+ try:
|
|
|
+ with open(config_path) as config_file:
|
|
|
+ ssh_config.parse(config_file)
|
|
|
+ except FileNotFoundError:
|
|
|
+ # Config file doesn't exist - this is normal, ignore silently
|
|
|
+ pass
|
|
|
+ except (OSError, PermissionError) as e:
|
|
|
+ # Config file exists but can't be read - warn user
|
|
|
+ warnings.warn(f"Could not read SSH config file {config_path}: {e}")
|
|
|
+ return ssh_config
|
|
|
|
|
|
def run_command(
|
|
|
self,
|
|
@@ -93,18 +112,39 @@ class ParamikoSSHVendor:
|
|
|
) -> _ParamikoWrapper:
|
|
|
client = paramiko.SSHClient()
|
|
|
|
|
|
- connection_kwargs: dict[str, Any] = {"hostname": host}
|
|
|
+ # Get SSH config for this host
|
|
|
+ host_config = self.ssh_config.lookup(host)
|
|
|
+
|
|
|
+ connection_kwargs: dict[str, Any] = {
|
|
|
+ "hostname": host_config.get("hostname", host)
|
|
|
+ }
|
|
|
connection_kwargs.update(self.kwargs)
|
|
|
+
|
|
|
+ # Use SSH config values if not explicitly provided
|
|
|
if username:
|
|
|
connection_kwargs["username"] = username
|
|
|
+ elif "user" in host_config:
|
|
|
+ connection_kwargs["username"] = host_config["user"]
|
|
|
+
|
|
|
if port:
|
|
|
connection_kwargs["port"] = port
|
|
|
+ elif "port" in host_config:
|
|
|
+ connection_kwargs["port"] = int(host_config["port"])
|
|
|
+
|
|
|
if password:
|
|
|
connection_kwargs["password"] = password
|
|
|
if pkey:
|
|
|
connection_kwargs["pkey"] = pkey
|
|
|
if key_filename:
|
|
|
connection_kwargs["key_filename"] = key_filename
|
|
|
+ elif "identityfile" in host_config:
|
|
|
+ # Use the first identity file from SSH config
|
|
|
+ identity_files = host_config["identityfile"]
|
|
|
+ if isinstance(identity_files, list) and identity_files:
|
|
|
+ connection_kwargs["key_filename"] = identity_files[0]
|
|
|
+ elif isinstance(identity_files, str):
|
|
|
+ connection_kwargs["key_filename"] = identity_files
|
|
|
+
|
|
|
connection_kwargs.update(kwargs)
|
|
|
|
|
|
policy = paramiko.client.MissingHostKeyPolicy()
|