瀏覽代碼

Add ParamikoSSHVendor.

Aaron O'Mullan 11 年之前
父節點
當前提交
fcd6aa89ee
共有 2 個文件被更改,包括 105 次插入0 次删除
  1. 2 0
      NEWS
  2. 103 0
      dulwich/client.py

+ 2 - 0
NEWS

@@ -25,6 +25,8 @@
   * Add `MemoryObjectStore.add_pack` and `MemoryObjectStore.add_thin_pack` methods.
     (David Bennett)
 
+  * Add paramiko-based SSH vendor. (Aaron O'Mullan)
+
 0.9.0	2013-05-31
 
  BUG FIXES

+ 103 - 0
dulwich/client.py

@@ -671,6 +671,109 @@ class SubprocessSSHVendor(SSHVendor):
         return SubprocessWrapper(proc)
 
 
+try:
+    import paramiko
+except ImportError:
+    pass
+else:
+    import threading
+
+    class ParamikoWrapper(object):
+        STDERR_READ_N = 2048  # 2k
+
+        def __init__(self, client, channel, progress_stderr=None):
+            self.client = client
+            self.channel = channel
+            self.progress_stderr = progress_stderr
+            self.should_monitor = bool(progress_stderr) or True
+            self.monitor_thread = None
+            self.stderr = ''
+
+            # Channel must block
+            self.channel.setblocking(True)
+
+            # Start
+            if self.should_monitor:
+                self.monitor_thread = threading.Thread(target=self.monitor_stderr)
+                self.monitor_thread.start()
+
+        def monitor_stderr(self):
+            while self.should_monitor:
+                # Block and read
+                data = self.read_stderr(self.STDERR_READ_N)
+
+                # Socket closed
+                if not data:
+                    self.should_monitor = False
+                    break
+
+                # Emit data
+                if self.progress_stderr:
+                    self.progress_stderr(data)
+
+                # Append to buffer
+                self.stderr += data
+
+        def stop_monitoring(self):
+            # Stop StdErr thread
+            if self.should_monitor:
+                self.should_monitor = False
+                self.monitor_thread.join()
+
+                # Get left over data
+                data = self.channel.in_stderr_buffer.empty()
+                self.stderr += data
+
+        def can_read(self):
+            return self.channel.recv_ready()
+
+        def write(self, data):
+            return self.channel.sendall(data)
+
+        def read_stderr(self, n):
+            return self.channel.recv_stderr(n)
+
+        def read(self, n=None):
+            data = self.channel.recv(n)
+            data_len = len(data)
+
+            # Closed socket
+            if not data:
+                return
+
+            # Read more if needed
+            if n and data_len < n:
+                diff_len = n - data_len
+                return data + self.read(diff_len)
+            return data
+
+        def close(self):
+            self.channel.close()
+            self.stop_monitoring()
+
+        def __del__(self):
+            self.close()
+
+    class ParamikoSSHVendor(object):
+
+        def connect_ssh(self, host, command, username=None, port=None,
+                progress_stderr=None, **kwargs):
+            client = paramiko.SSHClient()
+
+            policy = paramiko.client.MissingHostKeyPolicy()
+            client.set_missing_host_key_policy(policy)
+            client.connect(host, username=username, port=port, **kwargs)
+
+            # Open SSH session
+            channel = client.get_transport().open_session()
+
+            # Run commands
+            apply(channel.exec_command, command)
+
+            return ParamikoWrapper(client, channel,
+                    progress_stderr=progress_stderr)
+
+
 # Can be overridden by users
 get_ssh_vendor = SubprocessSSHVendor