瀏覽代碼

protocol: Add simple helper class for parsing a pktline stream.

This has some overlap with the pktline reading code in Protocol. The
implementation seemed simple enough not to worry about that, but I'd
love to hear ideas on how to share more code between them.
Jelmer Vernooij 13 年之前
父節點
當前提交
ed17980283
共有 2 個文件被更改,包括 60 次插入0 次删除
  1. 33 0
      dulwich/protocol.py
  2. 27 0
      dulwich/tests/test_protocol.py

+ 33 - 0
dulwich/protocol.py

@@ -406,3 +406,36 @@ class BufferedPktLineWriter(object):
             self._write(data)
         self._len = 0
         self._wbuf = StringIO()
+
+
+class PktLineParser(object):
+    """Packet line parser that hands completed packets off to a callback.
+    """
+
+    def __init__(self, handle_pkt):
+        self.handle_pkt = handle_pkt
+        self._readahead = StringIO()
+
+    def parse(self, data):
+        """Parse a fragment of data and call back for any completed packets.
+        """
+        self._readahead.write(data)
+        buf = self._readahead.getvalue()
+        if len(buf) < 4:
+            return
+        while len(buf) >= 4:
+            size = int(buf[:4], 16)
+            if size == 0:
+                self.handle_pkt(None)
+                buf = buf[4:]
+            elif size <= len(buf):
+                self.handle_pkt(buf[4:size])
+                buf = buf[size:]
+            else:
+                break
+        self._readahead = StringIO()
+        self._readahead.write(buf)
+
+    def get_tail(self):
+        """Read back any unused data."""
+        return self._readahead.getvalue()

+ 27 - 0
dulwich/tests/test_protocol.py

@@ -25,6 +25,7 @@ from dulwich.errors import (
     HangupException,
     )
 from dulwich.protocol import (
+    PktLineParser,
     Protocol,
     ReceivableProtocol,
     extract_capabilities,
@@ -280,3 +281,29 @@ class BufferedPktLineWriterTests(TestCase):
         self._writer.write('z')
         self._writer.flush()
         self.assertOutputEquals('0005z')
+
+
+class PktLineParserTests(TestCase):
+
+    def test_none(self):
+        pktlines = []
+        parser = PktLineParser(pktlines.append)
+        parser.parse("0000")
+        self.assertEquals(pktlines, [None])
+        self.assertEquals("", parser.get_tail())
+
+    def test_small_fragments(self):
+        pktlines = []
+        parser = PktLineParser(pktlines.append)
+        parser.parse("00")
+        parser.parse("05")
+        parser.parse("z0000")
+        self.assertEquals(pktlines, ["z", None])
+        self.assertEquals("", parser.get_tail())
+
+    def test_multiple_packets(self):
+        pktlines = []
+        parser = PktLineParser(pktlines.append)
+        parser.parse("0005z0006aba")
+        self.assertEquals(pktlines, ["z", "ab"])
+        self.assertEquals("a", parser.get_tail())