|
@@ -30,10 +30,40 @@ from unittest import skipIf
|
|
|
try:
|
|
|
import paramiko
|
|
|
except ImportError:
|
|
|
- paramiko = None
|
|
|
+ has_paramiko = False
|
|
|
else:
|
|
|
+ has_paramiko = True
|
|
|
from dulwich.contrib.paramiko_vendor import ParamikoSSHVendor
|
|
|
|
|
|
+ class Server(paramiko.ServerInterface):
|
|
|
+ """http://docs.paramiko.org/en/2.4/api/server.html"""
|
|
|
+ def __init__(self, commands, *args, **kwargs):
|
|
|
+ super(Server, self).__init__(*args, **kwargs)
|
|
|
+ self.commands = commands
|
|
|
+
|
|
|
+ def check_channel_exec_request(self, channel, command):
|
|
|
+ self.commands.append(command)
|
|
|
+ return True
|
|
|
+
|
|
|
+ def check_auth_password(self, username, password):
|
|
|
+ if username == USER and password == PASSWORD:
|
|
|
+ return paramiko.AUTH_SUCCESSFUL
|
|
|
+ return paramiko.AUTH_FAILED
|
|
|
+
|
|
|
+ def check_auth_publickey(self, username, key):
|
|
|
+ pubkey = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
|
|
|
+ if username == USER and key == pubkey:
|
|
|
+ return paramiko.AUTH_SUCCESSFUL
|
|
|
+ return paramiko.AUTH_FAILED
|
|
|
+
|
|
|
+ def check_channel_request(self, kind, chanid):
|
|
|
+ if kind == "session":
|
|
|
+ return paramiko.OPEN_SUCCEEDED
|
|
|
+ return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
|
|
|
+
|
|
|
+ def get_allowed_auths(self, username):
|
|
|
+ return "password,publickey"
|
|
|
+
|
|
|
|
|
|
USER = 'testuser'
|
|
|
PASSWORD = 'test'
|
|
@@ -95,38 +125,7 @@ WxtWBWHwxfSmqgTXilEA3ALJp0kNolLnEttnhENwJpZHlqtes0ZA4w==
|
|
|
-----END RSA PRIVATE KEY-----"""
|
|
|
|
|
|
|
|
|
-if paramiko is not None:
|
|
|
- class Server(paramiko.ServerInterface):
|
|
|
- """http://docs.paramiko.org/en/2.4/api/server.html"""
|
|
|
- def __init__(self, commands, *args, **kwargs):
|
|
|
- super(Server, self).__init__(*args, **kwargs)
|
|
|
- self.commands = commands
|
|
|
-
|
|
|
- def check_channel_exec_request(self, channel, command):
|
|
|
- self.commands.append(command)
|
|
|
- return True
|
|
|
-
|
|
|
- def check_auth_password(self, username, password):
|
|
|
- if username == USER and password == PASSWORD:
|
|
|
- return paramiko.AUTH_SUCCESSFUL
|
|
|
- return paramiko.AUTH_FAILED
|
|
|
-
|
|
|
- def check_auth_publickey(self, username, key):
|
|
|
- pubkey = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
|
|
|
- if username == USER and key == pubkey:
|
|
|
- return paramiko.AUTH_SUCCESSFUL
|
|
|
- return paramiko.AUTH_FAILED
|
|
|
-
|
|
|
- def check_channel_request(self, kind, chanid):
|
|
|
- if kind == "session":
|
|
|
- return paramiko.OPEN_SUCCEEDED
|
|
|
- return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
|
|
|
-
|
|
|
- def get_allowed_auths(self, username):
|
|
|
- return "password,publickey"
|
|
|
-
|
|
|
-
|
|
|
-@skipIf(paramiko is None, "paramiko is not installed")
|
|
|
+@skipIf(not has_paramiko, "paramiko is not installed")
|
|
|
class ParamikoSSHVendorTests(TestCase):
|
|
|
|
|
|
def setUp(self):
|