client.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. # client.py -- Implementation of the server side git protocols
  2. # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
  3. # Copyright (C) 2008 John Carr
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU General Public License
  7. # as published by the Free Software Foundation; either version 2
  8. # or (at your option) a later version of the License.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program; if not, write to the Free Software
  17. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  18. # MA 02110-1301, USA.
  19. """Client side support for the Git protocol."""
  20. __docformat__ = 'restructuredText'
  21. import select
  22. import socket
  23. import subprocess
  24. import urlparse
  25. from dulwich.errors import (
  26. GitProtocolError,
  27. SendPackError,
  28. UpdateRefsError,
  29. )
  30. from dulwich.protocol import (
  31. PktLineParser,
  32. Protocol,
  33. TCP_GIT_PORT,
  34. ZERO_SHA,
  35. extract_capabilities,
  36. )
  37. from dulwich.pack import (
  38. write_pack_objects,
  39. )
  40. # Python 2.6.6 included these in urlparse.uses_netloc upstream. Do
  41. # monkeypatching to enable similar behaviour in earlier Pythons:
  42. for scheme in ('git', 'git+ssh'):
  43. if scheme not in urlparse.uses_netloc:
  44. urlparse.uses_netloc.append(scheme)
  45. def _fileno_can_read(fileno):
  46. """Check if a file descriptor is readable."""
  47. return len(select.select([fileno], [], [], 0)[0]) > 0
  48. COMMON_CAPABILITIES = ['ofs-delta', 'side-band-64k']
  49. FETCH_CAPABILITIES = ['multi_ack'] + COMMON_CAPABILITIES
  50. SEND_CAPABILITIES = ['report-status'] + COMMON_CAPABILITIES
  51. class ReportStatusParser(object):
  52. """Handle status as reported by servers with the 'report-status' capability.
  53. """
  54. def __init__(self):
  55. self._done = False
  56. self._pack_status = None
  57. self._ref_status_ok = True
  58. self._ref_statuses = []
  59. def check(self):
  60. """Check if there were any errors and, if so, raise exceptions.
  61. :raise SendPackError: Raised when the server could not unpack
  62. :raise UpdateRefsError: Raised when refs could not be updated
  63. """
  64. if self._pack_status not in ('unpack ok', None):
  65. raise SendPackError(self._pack_status)
  66. if not self._ref_status_ok:
  67. ref_status = {}
  68. ok = set()
  69. for status in self._ref_statuses:
  70. if ' ' not in status:
  71. # malformed response, move on to the next one
  72. continue
  73. status, ref = status.split(' ', 1)
  74. if status == 'ng':
  75. if ' ' in ref:
  76. ref, status = ref.split(' ', 1)
  77. else:
  78. ok.add(ref)
  79. ref_status[ref] = status
  80. raise UpdateRefsError('%s failed to update' %
  81. ', '.join([ref for ref in ref_status
  82. if ref not in ok]),
  83. ref_status=ref_status)
  84. def handle_packet(self, pkt):
  85. """Handle a packet.
  86. :raise GitProtocolError: Raised when packets are received after a
  87. flush packet.
  88. """
  89. if self._done:
  90. raise GitProtocolError("received more data after status report")
  91. if pkt is None:
  92. self._done = True
  93. return
  94. if self._pack_status is None:
  95. self._pack_status = pkt.strip()
  96. else:
  97. ref_status = pkt.strip()
  98. self._ref_statuses.append(ref_status)
  99. if not ref_status.startswith('ok '):
  100. self._ref_status_ok = False
  101. # TODO(durin42): this doesn't correctly degrade if the server doesn't
  102. # support some capabilities. This should work properly with servers
  103. # that don't support multi_ack.
  104. class GitClient(object):
  105. """Git smart server client.
  106. """
  107. def __init__(self, thin_packs=True, report_activity=None):
  108. """Create a new GitClient instance.
  109. :param thin_packs: Whether or not thin packs should be retrieved
  110. :param report_activity: Optional callback for reporting transport
  111. activity.
  112. """
  113. self._report_activity = report_activity
  114. self._fetch_capabilities = list(FETCH_CAPABILITIES)
  115. self._send_capabilities = list(SEND_CAPABILITIES)
  116. if thin_packs:
  117. self._fetch_capabilities.append('thin-pack')
  118. def _connect(self, cmd, path):
  119. """Create a connection to the server.
  120. This method is abstract - concrete implementations should
  121. implement their own variant which connects to the server and
  122. returns an initialized Protocol object with the service ready
  123. for use and a can_read function which may be used to see if
  124. reads would block.
  125. :param cmd: The git service name to which we should connect.
  126. :param path: The path we should pass to the service.
  127. """
  128. raise NotImplementedError()
  129. def _read_refs(self, proto):
  130. server_capabilities = None
  131. refs = {}
  132. # Receive refs from server
  133. for pkt in proto.read_pkt_seq():
  134. (sha, ref) = pkt.rstrip('\n').split(' ', 1)
  135. if sha == 'ERR':
  136. raise GitProtocolError(ref)
  137. if server_capabilities is None:
  138. (ref, server_capabilities) = extract_capabilities(ref)
  139. refs[ref] = sha
  140. return refs, server_capabilities
  141. def send_pack(self, path, determine_wants, generate_pack_contents,
  142. progress=None):
  143. """Upload a pack to a remote repository.
  144. :param path: Repository path
  145. :param generate_pack_contents: Function that can return a sequence of the
  146. shas of the objects to upload.
  147. :param progress: Optional callback called with progress updates
  148. :raises SendPackError: if server rejects the pack data
  149. :raises UpdateRefsError: if the server supports report-status
  150. and rejects ref updates
  151. """
  152. proto, unused_can_read = self._connect('receive-pack', path)
  153. old_refs, server_capabilities = self._read_refs(proto)
  154. negotiated_capabilities = list(self._send_capabilities)
  155. if 'report-status' not in server_capabilities:
  156. negotiated_capabilities.remove('report-status')
  157. new_refs = determine_wants(old_refs)
  158. if not new_refs:
  159. proto.write_pkt_line(None)
  160. return {}
  161. want = []
  162. have = [x for x in old_refs.values() if not x == ZERO_SHA]
  163. sent_capabilities = False
  164. for refname in set(new_refs.keys() + old_refs.keys()):
  165. old_sha1 = old_refs.get(refname, ZERO_SHA)
  166. new_sha1 = new_refs.get(refname, ZERO_SHA)
  167. if old_sha1 != new_sha1:
  168. if sent_capabilities:
  169. proto.write_pkt_line('%s %s %s' % (old_sha1, new_sha1,
  170. refname))
  171. else:
  172. proto.write_pkt_line(
  173. '%s %s %s\0%s' % (old_sha1, new_sha1, refname,
  174. ' '.join(negotiated_capabilities)))
  175. sent_capabilities = True
  176. if new_sha1 not in have and new_sha1 != ZERO_SHA:
  177. want.append(new_sha1)
  178. proto.write_pkt_line(None)
  179. if not want:
  180. return new_refs
  181. objects = generate_pack_contents(have, want)
  182. entries, sha = write_pack_objects(proto.write_file(), objects)
  183. if 'report-status' in negotiated_capabilities:
  184. report_status_parser = ReportStatusParser()
  185. else:
  186. report_status_parser = None
  187. if "side-band-64k" in negotiated_capabilities:
  188. channel_callbacks = { 2: progress }
  189. if 'report-status' in negotiated_capabilities:
  190. channel_callbacks[1] = PktLineParser(
  191. report_status_parser.handle_packet).parse
  192. self._read_side_band64k_data(proto, channel_callbacks)
  193. else:
  194. if 'report-status':
  195. for pkt in proto.read_pkt_seq():
  196. report_status_parser.handle_packet(pkt)
  197. if report_status_parser is not None:
  198. report_status_parser.check()
  199. # wait for EOF before returning
  200. data = proto.read()
  201. if data:
  202. raise SendPackError('Unexpected response %r' % data)
  203. return new_refs
  204. def fetch(self, path, target, determine_wants=None, progress=None):
  205. """Fetch into a target repository.
  206. :param path: Path to fetch from
  207. :param target: Target repository to fetch into
  208. :param determine_wants: Optional function to determine what refs
  209. to fetch
  210. :param progress: Optional progress function
  211. :return: remote refs
  212. """
  213. if determine_wants is None:
  214. determine_wants = target.object_store.determine_wants_all
  215. f, commit = target.object_store.add_pack()
  216. try:
  217. return self.fetch_pack(path, determine_wants,
  218. target.get_graph_walker(), f.write, progress)
  219. finally:
  220. commit()
  221. def fetch_pack(self, path, determine_wants, graph_walker, pack_data,
  222. progress=None):
  223. """Retrieve a pack from a git smart server.
  224. :param determine_wants: Callback that returns list of commits to fetch
  225. :param graph_walker: Object with next() and ack().
  226. :param pack_data: Callback called for each bit of data in the pack
  227. :param progress: Callback for progress reports (strings)
  228. """
  229. proto, can_read = self._connect('upload-pack', path)
  230. (refs, server_capabilities) = self._read_refs(proto)
  231. negotiated_capabilities = list(self._fetch_capabilities)
  232. wants = determine_wants(refs)
  233. if not wants:
  234. proto.write_pkt_line(None)
  235. return refs
  236. assert isinstance(wants, list) and type(wants[0]) == str
  237. proto.write_pkt_line('want %s %s\n' % (
  238. wants[0], ' '.join(negotiated_capabilities)))
  239. for want in wants[1:]:
  240. proto.write_pkt_line('want %s\n' % want)
  241. proto.write_pkt_line(None)
  242. have = graph_walker.next()
  243. while have:
  244. proto.write_pkt_line('have %s\n' % have)
  245. if can_read():
  246. pkt = proto.read_pkt_line()
  247. parts = pkt.rstrip('\n').split(' ')
  248. if parts[0] == 'ACK':
  249. graph_walker.ack(parts[1])
  250. assert parts[2] == 'continue'
  251. have = graph_walker.next()
  252. proto.write_pkt_line('done\n')
  253. pkt = proto.read_pkt_line()
  254. while pkt:
  255. parts = pkt.rstrip('\n').split(' ')
  256. if parts[0] == 'ACK':
  257. graph_walker.ack(pkt.split(' ')[1])
  258. if len(parts) < 3 or parts[2] != 'continue':
  259. break
  260. pkt = proto.read_pkt_line()
  261. if "side-band-64k" in negotiated_capabilities:
  262. self._read_side_band64k_data(proto, {1: pack_data, 2: progress})
  263. # wait for EOF before returning
  264. data = proto.read()
  265. if data:
  266. raise Exception('Unexpected response %r' % data)
  267. else:
  268. # FIXME: Buffering?
  269. pack_data(self.read())
  270. return refs
  271. def _read_side_band64k_data(self, proto, channel_callbacks):
  272. """Read per-channel data.
  273. This requires the side-band-64k capability.
  274. :param proto: Protocol object to read from
  275. :param channel_callbacks: Dictionary mapping channels to packet
  276. handlers to use. None for a callback discards channel data.
  277. """
  278. for pkt in proto.read_pkt_seq():
  279. channel = ord(pkt[0])
  280. pkt = pkt[1:]
  281. try:
  282. cb = channel_callbacks[channel]
  283. except KeyError:
  284. raise AssertionError('Invalid sideband channel %d' % channel)
  285. else:
  286. if cb is not None:
  287. cb(pkt)
  288. class TCPGitClient(GitClient):
  289. """A Git Client that works over TCP directly (i.e. git://)."""
  290. def __init__(self, host, port=None, *args, **kwargs):
  291. if port is None:
  292. port = TCP_GIT_PORT
  293. self._host = host
  294. self._port = port
  295. GitClient.__init__(self, *args, **kwargs)
  296. def _connect(self, cmd, path):
  297. sockaddrs = socket.getaddrinfo(self._host, self._port,
  298. socket.AF_UNSPEC, socket.SOCK_STREAM)
  299. s = None
  300. err = socket.error("no address found for %s" % self._host)
  301. for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
  302. s = socket.socket(family, socktype, proto)
  303. s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  304. try:
  305. s.connect(sockaddr)
  306. break
  307. except socket.error, err:
  308. if s is not None:
  309. s.close()
  310. s = None
  311. if s is None:
  312. raise err
  313. # -1 means system default buffering
  314. rfile = s.makefile('rb', -1)
  315. # 0 means unbuffered
  316. wfile = s.makefile('wb', 0)
  317. proto = Protocol(rfile.read, wfile.write,
  318. report_activity=self._report_activity)
  319. if path.startswith("/~"):
  320. path = path[1:]
  321. proto.send_cmd('git-%s' % cmd, path, 'host=%s' % self._host)
  322. return proto, lambda: _fileno_can_read(s)
  323. class SubprocessWrapper(object):
  324. """A socket-like object that talks to a subprocess via pipes."""
  325. def __init__(self, proc):
  326. self.proc = proc
  327. self.read = proc.stdout.read
  328. self.write = proc.stdin.write
  329. def can_read(self):
  330. if subprocess.mswindows:
  331. from msvcrt import get_osfhandle
  332. from win32pipe import PeekNamedPipe
  333. handle = get_osfhandle(self.proc.stdout.fileno())
  334. return PeekNamedPipe(handle, 0)[2] != 0
  335. else:
  336. return _fileno_can_read(self.proc.stdout.fileno())
  337. def close(self):
  338. self.proc.stdin.close()
  339. self.proc.stdout.close()
  340. self.proc.wait()
  341. class SubprocessGitClient(GitClient):
  342. """Git client that talks to a server using a subprocess."""
  343. def __init__(self, *args, **kwargs):
  344. self._connection = None
  345. GitClient.__init__(self, *args, **kwargs)
  346. def _connect(self, service, path):
  347. import subprocess
  348. argv = ['git', service, path]
  349. p = SubprocessWrapper(
  350. subprocess.Popen(argv, bufsize=0, stdin=subprocess.PIPE,
  351. stdout=subprocess.PIPE))
  352. return Protocol(p.read, p.write,
  353. report_activity=self._report_activity), p.can_read
  354. class SSHVendor(object):
  355. def connect_ssh(self, host, command, username=None, port=None):
  356. import subprocess
  357. #FIXME: This has no way to deal with passwords..
  358. args = ['ssh', '-x']
  359. if port is not None:
  360. args.extend(['-p', str(port)])
  361. if username is not None:
  362. host = '%s@%s' % (username, host)
  363. args.append(host)
  364. proc = subprocess.Popen(args + command,
  365. stdin=subprocess.PIPE,
  366. stdout=subprocess.PIPE)
  367. return SubprocessWrapper(proc)
  368. # Can be overridden by users
  369. get_ssh_vendor = SSHVendor
  370. class SSHGitClient(GitClient):
  371. def __init__(self, host, port=None, username=None, *args, **kwargs):
  372. self.host = host
  373. self.port = port
  374. self.username = username
  375. GitClient.__init__(self, *args, **kwargs)
  376. self.alternative_paths = {}
  377. def _get_cmd_path(self, cmd):
  378. return self.alternative_paths.get(cmd, 'git-%s' % cmd)
  379. def _connect(self, cmd, path):
  380. con = get_ssh_vendor().connect_ssh(
  381. self.host, ["%s '%s'" % (self._get_cmd_path(cmd), path)],
  382. port=self.port, username=self.username)
  383. return (Protocol(con.read, con.write, report_activity=self._report_activity),
  384. con.can_read)
  385. def get_transport_and_path(uri):
  386. """Obtain a git client from a URI or path.
  387. :param uri: URI or path
  388. :return: Tuple with client instance and relative path.
  389. """
  390. parsed = urlparse.urlparse(uri)
  391. if parsed.scheme == 'git':
  392. return TCPGitClient(parsed.hostname, port=parsed.port), parsed.path
  393. elif parsed.scheme == 'git+ssh':
  394. return SSHGitClient(parsed.hostname, port=parsed.port,
  395. username=parsed.username), parsed.path
  396. if parsed.scheme and not parsed.netloc:
  397. # SSH with no user@, zero or one leading slash.
  398. return SSHGitClient(parsed.scheme), parsed.path
  399. elif parsed.scheme:
  400. raise ValueError('Unknown git protocol scheme: %s' % parsed.scheme)
  401. elif '@' in parsed.path and ':' in parsed.path:
  402. # SSH with user@host:foo.
  403. user_host, path = parsed.path.split(':')
  404. user, host = user_host.rsplit('@')
  405. return SSHGitClient(host, username=user), path
  406. # Otherwise, assume it's a local path.
  407. return SubprocessGitClient(), uri