test_paramiko_vendor.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. # test_paramiko_vendor.py
  2. #
  3. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as published by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. #
  20. """Tests for paramiko_vendor."""
  21. import os
  22. import socket
  23. import tempfile
  24. import threading
  25. import time
  26. from io import StringIO
  27. from unittest import skipIf
  28. from unittest.mock import patch
  29. from .. import TestCase
  30. try:
  31. import paramiko
  32. except ImportError:
  33. has_paramiko = False
  34. else:
  35. has_paramiko = True
  36. import paramiko.transport
  37. from dulwich.contrib.paramiko_vendor import ParamikoSSHVendor
  38. class Server(paramiko.ServerInterface):
  39. """http://docs.paramiko.org/en/2.4/api/server.html."""
  40. def __init__(self, commands, *args, **kwargs) -> None:
  41. super().__init__(*args, **kwargs)
  42. self.commands = commands
  43. def check_channel_exec_request(self, channel, command) -> bool:
  44. self.commands.append(command)
  45. return True
  46. def check_auth_password(self, username, password):
  47. if username == USER and password == PASSWORD:
  48. return paramiko.AUTH_SUCCESSFUL
  49. return paramiko.AUTH_FAILED
  50. def check_auth_publickey(self, username, key):
  51. pubkey = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
  52. if username == USER and key == pubkey:
  53. return paramiko.AUTH_SUCCESSFUL
  54. return paramiko.AUTH_FAILED
  55. def check_channel_request(self, kind, chanid):
  56. if kind == "session":
  57. return paramiko.OPEN_SUCCEEDED
  58. return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
  59. def get_allowed_auths(self, username) -> str:
  60. return "password,publickey"
  61. class SSHServer:
  62. """A real SSH server using Paramiko that listens on a TCP port."""
  63. def __init__(self):
  64. self.commands = []
  65. self.server_socket = None
  66. self.server_thread = None
  67. self.host_key = paramiko.RSAKey.from_private_key(StringIO(SERVER_KEY))
  68. self.running = False
  69. self.connection_threads = []
  70. def start(self):
  71. """Start the SSH server on a random port."""
  72. self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  73. self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  74. self.server_socket.bind(("127.0.0.1", 0))
  75. self.port = self.server_socket.getsockname()[1]
  76. self.server_socket.listen(5)
  77. self.running = True
  78. self.server_thread = threading.Thread(target=self._run_server)
  79. self.server_thread.daemon = True
  80. self.server_thread.start()
  81. # Give the server a moment to start up
  82. time.sleep(0.1)
  83. def stop(self):
  84. """Stop the SSH server."""
  85. self.running = False
  86. if self.server_socket:
  87. try:
  88. self.server_socket.close()
  89. except OSError:
  90. pass
  91. # Clean up connection threads
  92. for thread in self.connection_threads:
  93. if thread.is_alive():
  94. thread.join(timeout=1)
  95. if self.server_thread and self.server_thread.is_alive():
  96. self.server_thread.join(timeout=5)
  97. def _run_server(self):
  98. """Main server loop."""
  99. self.server_socket.settimeout(
  100. 1.0
  101. ) # Allow checking self.running periodically
  102. while self.running:
  103. try:
  104. client_socket, _addr = self.server_socket.accept()
  105. # Handle each connection in a separate thread
  106. conn_thread = threading.Thread(
  107. target=self._handle_connection, args=(client_socket,)
  108. )
  109. conn_thread.daemon = True
  110. conn_thread.start()
  111. self.connection_threads.append(conn_thread)
  112. except TimeoutError:
  113. # Normal timeout, continue to check if we should keep running
  114. continue
  115. except OSError as e:
  116. # Socket was closed, exit gracefully
  117. if not self.running:
  118. break
  119. # Otherwise re-raise the error
  120. raise e
  121. def _handle_connection(self, client_socket):
  122. """Handle a single SSH connection."""
  123. transport = None
  124. try:
  125. transport = paramiko.Transport(client_socket)
  126. transport.add_server_key(self.host_key)
  127. server = Server(self.commands)
  128. transport.start_server(server=server)
  129. # Wait for channel requests and handle them
  130. while self.running and transport.is_active():
  131. channel = transport.accept(1)
  132. if channel is None:
  133. continue
  134. # Handle channel in a separate thread to allow multiple channels
  135. channel_thread = threading.Thread(
  136. target=self._handle_channel, args=(channel,)
  137. )
  138. channel_thread.daemon = True
  139. channel_thread.start()
  140. except paramiko.SSHException as e:
  141. print(f"SSH error in connection handler: {e}")
  142. except OSError as e:
  143. print(f"Socket error in connection handler: {e}")
  144. finally:
  145. if transport:
  146. transport.close()
  147. if client_socket:
  148. client_socket.close()
  149. def _handle_channel(self, channel):
  150. """Handle a single SSH channel - echo server."""
  151. try:
  152. # Set channel to blocking mode
  153. channel.setblocking(True)
  154. channel.settimeout(10.0)
  155. # Read all available data and echo it back
  156. while True:
  157. try:
  158. data = channel.recv(4096)
  159. if not data:
  160. break
  161. # Echo the data back immediately
  162. channel.send(data)
  163. except TimeoutError:
  164. # No more data available, break
  165. break
  166. except paramiko.SSHException as e:
  167. print(f"SSH error in channel handler: {e}")
  168. except OSError as e:
  169. print(f"Socket error in channel handler: {e}")
  170. finally:
  171. try:
  172. channel.close()
  173. except OSError:
  174. pass
  175. USER = "testuser"
  176. PASSWORD = "test"
  177. SERVER_KEY = """\
  178. -----BEGIN RSA PRIVATE KEY-----
  179. MIIEpAIBAAKCAQEAy/L1sSYAzxsMprtNXW4u/1jGXXkQmQ2xtmKVlR+RlIL3a1BH
  180. bzTpPlZyjltAAwzIP8XRh0iJFKz5y3zSQChhX47ZGN0NvQsVct8R+YwsUonwfAJ+
  181. JN0KBKKvC8fPHlzqBr3gX+ZxqsFH934tQ6wdQPH5eQWtdM8L826lMsH1737uyTGk
  182. +mCSDjL3c6EzY83g7qhkJU2R4qbi6ne01FaWADzG8sOzXnHT+xpxtk8TTT8yCVUY
  183. MmBNsSoA/ka3iWz70ghB+6Xb0WpFJZXWq1oYovviPAfZGZSrxBZMxsWMye70SdLl
  184. TqsBEt0+miIcm9s0fvjWvQuhaHX6mZs5VO4r5QIDAQABAoIBAGYqeYWaYgFdrYLA
  185. hUrubUCg+g3NHdFuGL4iuIgRXl4lFUh+2KoOuWDu8Uf60iA1AQNhV0sLvQ/Mbv3O
  186. s4xMLisuZfaclctDiCUZNenqnDFkxEF7BjH1QJV94W5nU4wEQ3/JEmM4D2zYkfKb
  187. FJW33JeyH6TOgUvohDYYEU1R+J9V8qA243p+ui1uVtNI6Pb0TXJnG5y9Ny4vkSWH
  188. Fi0QoMPR1r9xJ4SEearGzA/crb4SmmDTKhGSoMsT3d5ATieLmwcS66xWz8w4oFGJ
  189. yzDq24s4Fp9ccNjMf/xR8XRiekJv835gjEqwF9IXyvgOaq6XJ1iCqGPFDKa25nui
  190. JnEstOkCgYEA/ZXk7aIanvdeJlTqpX578sJfCnrXLydzE8emk1b7+5mrzGxQ4/pM
  191. PBQs2f8glT3t0O0mRX9NoRqnwrid88/b+cY4NCOICFZeasX336/gYQxyVeRLJS6Z
  192. hnGEQqry8qS7PdKAyeHMNmZFrUh4EiHiObymEfQS+mkRUObn0cGBTw8CgYEAzeQU
  193. D2baec1DawjppKaRynAvWjp+9ry1lZx9unryKVRwjRjkEpw+b3/+hdaF1IvsVSce
  194. cNj+6W2guZ2tyHuPhZ64/4SJVyE2hKDSKD4xTb2nVjsMeN0bLD2UWXC9mwbx8nWa
  195. 2tmtUZ7a/okQb2cSdosJinRewLNqXIsBXamT1csCgYEA0cXb2RCOQQ6U3dTFPx4A
  196. 3vMXuA2iUKmrsqMoEx6T2LBow/Sefdkik1iFOdipVYwjXP+w9zC2QR1Rxez/DR/X
  197. 8ymceNUjxPHdrSoTQQG29dFcC92MpDeGXQcuyA+uZjcLhbrLOzYEvsOfxBb87NMG
  198. 14hNQPDNekTMREafYo9WrtUCgYAREK54+FVzcwf7fymedA/xb4r9N4v+d3W1iNsC
  199. 8d3Qfyc1CrMct8aVB07ZWQaOr2pPRIbJY7L9NhD0UZVt4I/sy1MaGqonhqE2LP4+
  200. R6legDG2e/50ph7yc8gwAaA1kUXMiuLi8Nfkw/3yyvmJwklNegi4aRzRbA2Mzhi2
  201. 4q9WMQKBgQCb0JNyxHG4pvLWCF/j0Sm1FfvrpnqSv5678n1j4GX7Ka/TubOK1Y4K
  202. U+Oib7dKa/zQMWehVFNTayrsq6bKVZ6q7zG+IHiRLw4wjeAxREFH6WUjDrn9vl2l
  203. D48DKbBuBwuVOJWyq3qbfgJXojscgNQklrsPdXVhDwOF0dYxP89HnA==
  204. -----END RSA PRIVATE KEY-----"""
  205. CLIENT_KEY = """\
  206. -----BEGIN RSA PRIVATE KEY-----
  207. MIIEpAIBAAKCAQEAxvREKSElPOm/0z/nPO+j5rk2tjdgGcGc7We1QZ6TRXYLu7nN
  208. GeEFIL4p8N1i6dmB+Eydt7xqCU79MWD6Yy4prFe1+/K1wCDUxIbFMxqQcX5zjJzd
  209. i8j8PbcaUlVhP/OkjtkSxrXaGDO1BzfdV4iEBtTV/2l3zmLKJlt3jnOHLczP24CB
  210. DTQKp3rKshbRefzot9Y+wnaK692RsYgsyo9YEP0GyWKG9topCHk13r46J6vGLeuj
  211. ryUKqmbLJkzbJbIcEqwTDo5iHaCVqaMr5Hrb8BdMucSseqZQJsXSd+9tdRcIblUQ
  212. 38kZjmFMm4SFbruJcpZCNM2wNSZPIRX+3eiwNwIDAQABAoIBAHSacOBSJsr+jIi5
  213. KUOTh9IPtzswVUiDKwARCjB9Sf8p4lKR4N1L/n9kNJyQhApeikgGT2GCMftmqgoo
  214. tlculQoHFgemBlOmak0MV8NNzF5YKEy/GzF0CDH7gJfEpoyetVFrdA+2QS5yD6U9
  215. XqKQxiBi2VEqdScmyyeT8AwzNYTnPeH/DOEcnbdRjqiy/CD79F49CQ1lX1Fuqm0K
  216. I7BivBH1xo/rVnUP4F+IzocDqoga+Pjdj0LTXIgJlHQDSbhsQqWujWQDDuKb+MAw
  217. sNK4Zf8ErV3j1PyA7f/M5LLq6zgstkW4qikDHo4SpZX8kFOO8tjqb7kujj7XqeaB
  218. CxqrOTECgYEA73uWkrohcmDJ4KqbuL3tbExSCOUiaIV+sT1eGPNi7GCmXD4eW5Z4
  219. 75v2IHymW83lORSu/DrQ6sKr1nkuRpqr2iBzRmQpl/H+wahIhBXlnJ25uUjDsuPO
  220. 1Pq2LcmyD+jTxVnmbSe/q7O09gZQw3I6H4+BMHmpbf8tC97lqimzpJ0CgYEA1K0W
  221. ZL70Xtn9quyHvbtae/BW07NZnxvUg4UaVIAL9Zu34JyplJzyzbIjrmlDbv6aRogH
  222. /KtuG9tfbf55K/jjqNORiuRtzt1hUN1ye4dyW7tHx2/7lXdlqtyK40rQl8P0kqf8
  223. zaS6BqjnobgSdSpg32rWoL/pcBHPdJCJEgQ8zeMCgYEA0/PK8TOhNIzrP1dgGSKn
  224. hkkJ9etuB5nW5mEM7gJDFDf6JPupfJ/xiwe6z0fjKK9S57EhqgUYMB55XYnE5iIw
  225. ZQ6BV9SAZ4V7VsRs4dJLdNC3tn/rDGHJBgCaym2PlbsX6rvFT+h1IC8dwv0V79Ui
  226. Ehq9WTzkMoE8yhvNokvkPZUCgYEAgBAFxv5xGdh79ftdtXLmhnDvZ6S8l6Fjcxqo
  227. Ay/jg66Tp43OU226iv/0mmZKM8Dd1xC8dnon4GBVc19jSYYiWBulrRPlx0Xo/o+K
  228. CzZBN1lrXH1i6dqufpc0jq8TMf/N+q1q/c1uMupsKCY1/xVYpc+ok71b7J7c49zQ
  229. nOeuUW8CgYA9Infooy65FTgbzca0c9kbCUBmcAPQ2ItH3JcPKWPQTDuV62HcT00o
  230. fZdIV47Nez1W5Clk191RMy8TXuqI54kocciUWpThc6j44hz49oUueb8U4bLcEHzA
  231. WxtWBWHwxfSmqgTXilEA3ALJp0kNolLnEttnhENwJpZHlqtes0ZA4w==
  232. -----END RSA PRIVATE KEY-----"""
  233. @skipIf(not has_paramiko, "paramiko is not installed")
  234. class ParamikoSSHVendorTests(TestCase):
  235. def setUp(self) -> None:
  236. self.commands = []
  237. socket.setdefaulttimeout(10)
  238. self.addCleanup(socket.setdefaulttimeout, None)
  239. self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  240. self.socket.bind(("127.0.0.1", 0))
  241. self.socket.listen(5)
  242. self.addCleanup(self.socket.close)
  243. self.port = self.socket.getsockname()[1]
  244. self.thread = threading.Thread(target=self._run)
  245. self.thread.start()
  246. def tearDown(self) -> None:
  247. self.thread.join()
  248. def _run(self) -> bool | None:
  249. try:
  250. conn, _addr = self.socket.accept()
  251. except OSError:
  252. return False
  253. self.transport = paramiko.Transport(conn)
  254. self.addCleanup(self.transport.close)
  255. host_key = paramiko.RSAKey.from_private_key(StringIO(SERVER_KEY))
  256. self.transport.add_server_key(host_key)
  257. server = Server(self.commands)
  258. self.transport.start_server(server=server)
  259. def test_run_command_password(self) -> None:
  260. vendor = ParamikoSSHVendor(
  261. allow_agent=False,
  262. look_for_keys=False,
  263. )
  264. vendor.run_command(
  265. "127.0.0.1",
  266. "test_run_command_password",
  267. username=USER,
  268. port=self.port,
  269. password=PASSWORD,
  270. )
  271. self.assertIn(b"test_run_command_password", self.commands)
  272. def test_run_command_with_privkey(self) -> None:
  273. key = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
  274. vendor = ParamikoSSHVendor(
  275. allow_agent=False,
  276. look_for_keys=False,
  277. )
  278. vendor.run_command(
  279. "127.0.0.1",
  280. "test_run_command_with_privkey",
  281. username=USER,
  282. port=self.port,
  283. pkey=key,
  284. )
  285. self.assertIn(b"test_run_command_with_privkey", self.commands)
  286. def test_run_command_data_transfer(self) -> None:
  287. vendor = ParamikoSSHVendor(
  288. allow_agent=False,
  289. look_for_keys=False,
  290. )
  291. con = vendor.run_command(
  292. "127.0.0.1",
  293. "test_run_command_data_transfer",
  294. username=USER,
  295. port=self.port,
  296. password=PASSWORD,
  297. )
  298. self.assertIn(b"test_run_command_data_transfer", self.commands)
  299. channel = self.transport.accept(5)
  300. channel.send(b"stdout\n")
  301. channel.send_stderr(b"stderr\n")
  302. channel.close()
  303. # Fixme: it's return false
  304. # self.assertTrue(con.can_read())
  305. self.assertEqual(b"stdout\n", con.read(4096))
  306. # Fixme: it's return empty string
  307. # self.assertEqual(b'stderr\n', con.read_stderr(4096))
  308. def test_ssh_config_parsing(self) -> None:
  309. """Test that SSH config is properly parsed and used by ParamikoSSHVendor."""
  310. # Create a temporary SSH config file
  311. with tempfile.NamedTemporaryFile(mode="w", suffix=".config", delete=False) as f:
  312. f.write(
  313. f"""
  314. Host testserver
  315. HostName 127.0.0.1
  316. User testuser
  317. Port {self.port}
  318. IdentityFile /path/to/key
  319. """
  320. )
  321. config_path = f.name
  322. try:
  323. # Mock the config path
  324. with patch(
  325. "dulwich.contrib.paramiko_vendor.os.path.expanduser"
  326. ) as mock_expanduser:
  327. def side_effect(path):
  328. if path == "~/.ssh/config":
  329. return config_path
  330. return path
  331. mock_expanduser.side_effect = side_effect
  332. vendor = ParamikoSSHVendor(
  333. allow_agent=False,
  334. look_for_keys=False,
  335. )
  336. # Test that SSH config values are loaded
  337. host_config = vendor.ssh_config.lookup("testserver")
  338. self.assertEqual(host_config["hostname"], "127.0.0.1")
  339. self.assertEqual(host_config["user"], "testuser")
  340. self.assertEqual(host_config["port"], str(self.port))
  341. self.assertIn("/path/to/key", host_config["identityfile"])
  342. finally:
  343. os.unlink(config_path)
  344. @skipIf(not has_paramiko, "paramiko is not installed")
  345. class ParamikoSSHVendorRealServerTests(TestCase):
  346. """Tests for ParamikoSSHVendor using a real SSH server listening on TCP."""
  347. def setUp(self) -> None:
  348. self.ssh_server = SSHServer()
  349. self.ssh_server.start()
  350. socket.setdefaulttimeout(10)
  351. self.addCleanup(socket.setdefaulttimeout, None)
  352. self.addCleanup(self.ssh_server.stop)
  353. def _run_command(self, command, **kwargs):
  354. """Helper to run a command with default vendor settings."""
  355. vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False)
  356. kwargs.setdefault("port", self.ssh_server.port)
  357. kwargs.setdefault("username", USER)
  358. return vendor.run_command("127.0.0.1", command, **kwargs)
  359. def _test_echo(self, con, data):
  360. """Helper to test echo functionality."""
  361. con.write(data)
  362. response = con.read(len(data))
  363. self.assertEqual(data, response)
  364. def test_password_authentication_success(self) -> None:
  365. """Test successful password authentication."""
  366. con = self._run_command("test_password_auth", password=PASSWORD)
  367. self.assertIn(b"test_password_auth", self.ssh_server.commands)
  368. self._test_echo(con, b"hello\n")
  369. con.close()
  370. def test_key_authentication_success(self) -> None:
  371. """Test successful key authentication."""
  372. key = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
  373. con = self._run_command("test_key_auth", pkey=key)
  374. self.assertIn(b"test_key_auth", self.ssh_server.commands)
  375. self._test_echo(con, b"key_test\n")
  376. con.close()
  377. def test_authentication_failures(self) -> None:
  378. """Test authentication failures."""
  379. # Wrong password
  380. with self.assertRaises(paramiko.AuthenticationException):
  381. self._run_command("should_fail", password="wrong_password")
  382. # Wrong key
  383. wrong_key = paramiko.RSAKey.generate(2048)
  384. with self.assertRaises(paramiko.AuthenticationException):
  385. self._run_command("should_fail", pkey=wrong_key)
  386. def test_connection_errors(self) -> None:
  387. """Test various connection errors."""
  388. vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False)
  389. # Non-existent port
  390. with self.assertRaises((OSError, ConnectionRefusedError)):
  391. vendor.run_command(
  392. "127.0.0.1", "fail", username=USER, port=65432, password=PASSWORD
  393. )
  394. # Invalid hostname
  395. with self.assertRaises((socket.gaierror, OSError)):
  396. vendor.run_command(
  397. "invalid.hostname.example.com", "fail", username=USER, password=PASSWORD
  398. )
  399. def test_data_transfer(self) -> None:
  400. """Test various data transfer scenarios."""
  401. con = self._run_command("test_data", password=PASSWORD)
  402. # Large data (10KB)
  403. large_data = b"X" * 10240
  404. self._test_echo(con, large_data)
  405. # Binary data with all byte values
  406. binary_data = bytes(range(256))
  407. self._test_echo(con, binary_data)
  408. con.close()
  409. def test_multiple_connections(self) -> None:
  410. """Test multiple sequential connections."""
  411. # First connection
  412. con1 = self._run_command("test_connection_1", password=PASSWORD)
  413. self._test_echo(con1, b"first\n")
  414. con1.close()
  415. # Second connection
  416. con2 = self._run_command("test_connection_2", password=PASSWORD)
  417. self._test_echo(con2, b"second\n")
  418. con2.close()
  419. # Verify both commands were recorded
  420. self.assertIn(b"test_connection_1", self.ssh_server.commands)
  421. self.assertIn(b"test_connection_2", self.ssh_server.commands)
  422. def test_key_from_file(self) -> None:
  423. """Test authentication using key file."""
  424. with tempfile.NamedTemporaryFile(mode="w", suffix=".key", delete=False) as f:
  425. f.write(CLIENT_KEY)
  426. key_path = f.name
  427. try:
  428. con = self._run_command("test_key_from_file", key_filename=key_path)
  429. self.assertIn(b"test_key_from_file", self.ssh_server.commands)
  430. self._test_echo(con, b"file_key_test\n")
  431. con.close()
  432. finally:
  433. os.unlink(key_path)
  434. def test_protocol_versions(self) -> None:
  435. """Test protocol version handling."""
  436. # Protocol version 2 (default)
  437. con = self._run_command(
  438. "test_protocol_v2", password=PASSWORD, protocol_version=2
  439. )
  440. self.assertIn(b"test_protocol_v2", self.ssh_server.commands)
  441. con.close()
  442. # Protocol version 1
  443. con = self._run_command(
  444. "test_protocol_v1", password=PASSWORD, protocol_version=1
  445. )
  446. self.assertIn(b"test_protocol_v1", self.ssh_server.commands)
  447. con.close()
  448. def test_vendor_options(self) -> None:
  449. """Test vendor initialization options."""
  450. # Test with timeout
  451. vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False, timeout=1)
  452. con = vendor.run_command(
  453. "127.0.0.1",
  454. "test_timeout",
  455. username=USER,
  456. port=self.ssh_server.port,
  457. password=PASSWORD,
  458. )
  459. self.assertIn(b"test_timeout", self.ssh_server.commands)
  460. con.close()
  461. def test_can_read(self) -> None:
  462. """Test can_read functionality."""
  463. con = self._run_command("test_can_read", password=PASSWORD)
  464. # Check can_read returns bool
  465. self.assertIsInstance(con.can_read(), bool)
  466. # Send data and verify echo
  467. self._test_echo(con, b"test_data\n")
  468. con.close()
  469. def test_partial_reads(self) -> None:
  470. """Test reading data in small chunks."""
  471. con = self._run_command("test_partial", password=PASSWORD)
  472. test_data = b"0123456789" * 10 # 100 bytes
  473. con.write(test_data)
  474. # Read in 10-byte chunks
  475. received_data = b""
  476. while len(received_data) < len(test_data):
  477. chunk = con.read(10)
  478. if not chunk:
  479. break
  480. received_data += chunk
  481. self.assertEqual(test_data, received_data)
  482. con.close()
  483. def test_ssh_config_integration(self) -> None:
  484. """Test SSH config integration."""
  485. with tempfile.NamedTemporaryFile(mode="w", suffix=".config", delete=False) as f:
  486. f.write(f"""
  487. Host testserver
  488. HostName 127.0.0.1
  489. User {USER}
  490. Port {self.ssh_server.port}
  491. """)
  492. config_path = f.name
  493. try:
  494. with patch(
  495. "dulwich.contrib.paramiko_vendor.os.path.expanduser"
  496. ) as mock_expanduser:
  497. mock_expanduser.side_effect = (
  498. lambda p: config_path if p == "~/.ssh/config" else p
  499. )
  500. vendor = ParamikoSSHVendor(allow_agent=False, look_for_keys=False)
  501. con = vendor.run_command(
  502. "testserver", "test_ssh_config", password=PASSWORD
  503. )
  504. self.assertIn(b"test_ssh_config", self.ssh_server.commands)
  505. self._test_echo(con, b"config_test\n")
  506. con.close()
  507. finally:
  508. os.unlink(config_path)