Преглед изворни кода

Rework server protocol to be smarter and interoperate with cgit client.

This largely consists of correctly implementing multi-ack and
single-ack mode on the client side as well as stopping graph walking
when a sufficient set of common commits is found. Tests are included,
and the implementation has been lightly tested against the cgit
client. However, the dulwich server does not interoperate with the
dulwich client anymore; this will be fixed in a future change.

This change also preserves the GraphWalker interface so as not to
break hg-git or bzr-git.

Change-Id: Ia63e7fd0da9ff17c9f3f546149b474eb4f8bb466
Dave Borowitz пре 15 година
родитељ
комит
49d25d7079
4 измењених фајлова са 622 додато и 59 уклоњено
  1. 30 5
      dulwich/protocol.py
  2. 224 52
      dulwich/server.py
  3. 13 2
      dulwich/tests/test_protocol.py
  4. 355 0
      dulwich/tests/test_server.py

+ 30 - 5
dulwich/protocol.py

@@ -28,6 +28,9 @@ from dulwich.errors import (
 
 TCP_GIT_PORT = 9418
 
+SINGLE_ACK = 0
+MULTI_ACK = 1
+
 class ProtocolFile(object):
     """
     Some network ops are like file ops. The file ops expect to operate on
@@ -160,11 +163,33 @@ def extract_capabilities(text):
     """Extract a capabilities list from a string, if present.
 
     :param text: String to extract from
-    :return: Tuple with text with capabilities removed and list of 
-        capabilities or None (if no capabilities were present.
+    :return: Tuple with text with capabilities removed and list of capabilities
     """
     if not "\0" in text:
-        return text, None
-    capabilities = text.split("\0")
-    return (capabilities[0], capabilities[1:])
+        return text, []
+    text, capabilities = text.rstrip().split("\0")
+    return (text, capabilities.split(" "))
+
+
+def extract_want_line_capabilities(text):
+    """Extract a capabilities list from a want line, if present.
+
+    Note that want lines have capabilities separated from the rest of the line
+    by a space instead of a null byte. Thus want lines have the form:
+
+        want obj-id cap1 cap2 ...
+
+    :param text: Want line to extract from
+    :return: Tuple with text with capabilities removed and list of capabilities
+    """
+    split_text = text.rstrip().split(" ")
+    if len(split_text) < 3:
+        return text, []
+    return (" ".join(split_text[:2]), split_text[2:])
+
 
+def ack_type(capabilities):
+    """Extract the ack type from a capabilities list."""
+    if 'multi_ack' in capabilities:
+        return MULTI_ACK
+    return SINGLE_ACK

+ 224 - 52
dulwich/server.py

@@ -20,14 +20,25 @@
 """Git smart network protocol server implementation."""
 
 
+import collections
 import SocketServer
 import tempfile
 
+from dulwich.errors import (
+    GitProtocolError,
+    )
+from dulwich.objects import (
+    hex_to_sha,
+    )
 from dulwich.protocol import (
     Protocol,
     ProtocolFile,
     TCP_GIT_PORT,
     extract_capabilities,
+    extract_want_line_capabilities,
+    SINGLE_ACK,
+    MULTI_ACK,
+    ack_type,
     )
 from dulwich.repo import (
     Repo,
@@ -73,6 +84,7 @@ class GitBackend(Backend):
             Repo.create(self.gitdir)
 
         self.repo = Repo(self.gitdir)
+        self.object_store = self.repo.object_store
         self.fetch_objects = self.repo.fetch_objects
         self.get_refs = self.repo.get_refs
 
@@ -106,12 +118,32 @@ class Handler(object):
 class UploadPackHandler(Handler):
     """Protocol handler for uploading a pack to the server."""
 
+    def __init__(self, backend, read, write):
+        Handler.__init__(self, backend, read, write)
+        self._client_capabilities = None
+        self._graph_walker = None
+
     def default_capabilities(self):
         return ("multi_ack", "side-band-64k", "thin-pack", "ofs-delta")
 
+    def set_client_capabilities(self, caps):
+        my_caps = self.default_capabilities()
+        for cap in caps:
+            if '_ack' in cap and cap not in my_caps:
+                raise GitProtocolError('Client asked for capability %s that '
+                                       'was not advertised.' % cap)
+        self._client_capabilities = caps
+
+    def get_client_capabilities(self):
+        return self._client_capabilities
+
+    client_capabilities = property(get_client_capabilities,
+                                   set_client_capabilities)
+
     def handle(self):
         def determine_wants(heads):
             keys = heads.keys()
+            values = set(heads.itervalues())
             if keys:
                 self.proto.write_pkt_line("%s %s\x00%s\n" % ( heads[keys[0]], keys[0], self.capabilities()))
                 for k in keys[1:]:
@@ -126,65 +158,29 @@ class UploadPackHandler(Handler):
             if want == None:
                 return []
 
-            want, self.client_capabilities = extract_capabilities(want)
+            want, self.client_capabilities = extract_want_line_capabilities(want)
+            graph_walker.set_ack_type(ack_type(self.client_capabilities))
 
             want_revs = []
             while want and want[:4] == 'want':
-                want_revs.append(want[5:45])
+                sha = want[5:45]
+                try:
+                    hex_to_sha(sha)
+                except (TypeError, AssertionError), e:
+                    raise GitProtocolError(e)
+
+                if sha not in values:
+                    raise GitProtocolError(
+                        'Client wants invalid object %s' % sha)
+                want_revs.append(sha)
                 want = self.proto.read_pkt_line()
-                if want == None:
-                    self.proto.write_pkt_line("ACK %s\n" % want_revs[-1])
+            graph_walker.set_wants(want_revs)
             return want_revs
 
         progress = lambda x: self.proto.write_sideband(2, x)
         write = lambda x: self.proto.write_sideband(1, x)
 
-        class ProtocolGraphWalker(object):
-
-            def __init__(self, proto):
-                self.proto = proto
-                self._last_sha = None
-                self._cached = False
-                self._cache = []
-                self._cache_index = 0
-
-            def ack(self, have_ref):
-                self.proto.write_pkt_line("ACK %s continue\n" % have_ref)
-
-            def reset(self):
-                self._cached = True
-                self._cache_index = 0
-
-            def next(self):
-                if not self._cached:
-                    return self.next_from_proto()
-                self._cache_index = self._cache_index + 1
-                if self._cache_index > len(self._cache):
-                    return None
-                return self._cache[self._cache_index]
-
-            def next_from_proto(self):
-                have = self.proto.read_pkt_line()
-                if have is None:
-                    self.proto.write_pkt_line("ACK %s\n" % self._last_sha)
-                    return None
-
-                if have[:4] == 'have':
-                    self._cache.append(have[5:45])
-                    return have[5:45]
-
-
-                #if have[:4] == 'done':
-                #    return None
-
-                if self._last_sha:
-                    # Oddness: Git seems to resend the last ACK, without the "continue" statement
-                    self.proto.write_pkt_line("ACK %s\n" % self._last_sha)
-
-                # The exchange finishes with a NAK
-                self.proto.write_pkt_line("NAK\n")
-
-        graph_walker = ProtocolGraphWalker(self.proto)
+        graph_walker = ProtocolGraphWalker(self.backend.object_store, self.proto)
         objects_iter = self.backend.fetch_objects(determine_wants, graph_walker, progress)
 
         # Do they want any objects?
@@ -200,6 +196,184 @@ class UploadPackHandler(Handler):
         self.proto.write("0000")
 
 
+class ProtocolGraphWalker(object):
+    """A graph walker that knows the git protocol.
+
+    As a graph walker, this class implements ack(), next(), and reset(). It also
+    contains some base methods for interacting with the wire and walking the
+    commit tree.
+
+    The work of determining which acks to send is passed on to the
+    implementation instance stored in _impl. The reason for this is that we do
+    not know at object creation time what ack level the protocol requires. A
+    call to set_ack_level() is required to set up the implementation, before any
+    calls to next() or ack() are made.
+    """
+    def __init__(self, object_store, proto):
+        self.store = object_store
+        self.proto = proto
+        self._wants = []
+        self._cached = False
+        self._cache = []
+        self._cache_index = 0
+        self._impl = None
+
+    def ack(self, have_ref):
+        return self._impl.ack(have_ref)
+
+    def reset(self):
+        self._cached = True
+        self._cache_index = 0
+
+    def next(self):
+        if not self._cached:
+            return self._impl.next()
+        self._cache_index += 1
+        if self._cache_index > len(self._cache):
+            return None
+        return self._cache[self._cache_index]
+
+    def read_proto_line(self):
+        """Read a line from the wire.
+
+        :return: a tuple having one of the following forms:
+            ('have', obj_id)
+            ('done', None)
+            (None, None)  (for a flush-pkt)
+        """
+        line = self.proto.read_pkt_line()
+        if not line:
+            return (None, None)
+        fields = line.rstrip('\n').split(' ', 1)
+        if len(fields) == 1 and fields[0] == 'done':
+            return ('done', None)
+        if len(fields) == 2 and fields[0] == 'have':
+            try:
+                hex_to_sha(fields[1])
+                return fields
+            except (TypeError, AssertionError), e:
+                raise GitProtocolError(e)
+        raise GitProtocolError('Received invalid line from client:\n%s' % line)
+
+    def send_ack(self, sha, ack_type=''):
+        if ack_type:
+            ack_type = ' %s' % ack_type
+        self.proto.write_pkt_line('ACK %s%s\n' % (sha, ack_type))
+
+    def send_nak(self):
+        self.proto.write_pkt_line('NAK\n')
+
+    def set_wants(self, wants):
+        self._wants = wants
+
+    def _is_satisfied(self, haves, want, earliest):
+        """Check whether a want is satisfied by a set of haves.
+
+        A want, typically a branch tip, is "satisfied" only if there exists a
+        path back from that want to one of the haves.
+
+        :param haves: A set of commits we know the client has.
+        :param want: The want to check satisfaction for.
+        :param earliest: A timestamp beyond which the search for haves will be
+            terminated, presumably because we're searching too far down the
+            wrong branch.
+        """
+        o = self.store[want]
+        pending = collections.deque([o])
+        while pending:
+            commit = pending.popleft()
+            if commit.id in haves:
+                return True
+            if not getattr(commit, 'get_parents', None):
+                # non-commit wants are assumed to be satisfied
+                continue
+            for parent in commit.get_parents():
+                parent_obj = self.store[parent]
+                # TODO: handle parents with later commit times than children
+                if parent_obj.commit_time >= earliest:
+                    pending.append(parent_obj)
+        return False
+
+    def all_wants_satisfied(self, haves):
+        """Check whether all the current wants are satisfied by a set of haves.
+
+        :param haves: A set of commits we know the client has.
+        :note: Wants are specified with set_wants rather than passed in since
+            in the current interface they are determined outside this class.
+        """
+        haves = set(haves)
+        earliest = min([self.store[h].commit_time for h in haves])
+        for want in self._wants:
+            if not self._is_satisfied(haves, want, earliest):
+                return False
+        return True
+
+    def set_ack_type(self, ack_type):
+        impl_classes = {
+            MULTI_ACK: MultiAckGraphWalkerImpl,
+            SINGLE_ACK: SingleAckGraphWalkerImpl,
+            }
+        self._impl = impl_classes[ack_type](self)
+
+
+class SingleAckGraphWalkerImpl(object):
+    """Graph walker implementation that speaks the single-ack protocol."""
+
+    def __init__(self, walker):
+        self.walker = walker
+        self._sent_ack = False
+
+    def ack(self, have_ref):
+        if not self._sent_ack:
+            self.walker.send_ack(have_ref)
+            self._sent_ack = True
+
+    def next(self):
+        command, sha = self.walker.read_proto_line()
+        if command in (None, 'done'):
+            if not self._sent_ack:
+                self.walker.send_nak()
+            return None
+        elif command == 'have':
+            return sha
+
+
+class MultiAckGraphWalkerImpl(object):
+    """Graph walker implementation that speaks the multi-ack protocol."""
+
+    def __init__(self, walker):
+        self.walker = walker
+        self._found_base = False
+        self._common = []
+
+    def ack(self, have_ref):
+        self._common.append(have_ref)
+        if not self._found_base:
+            self.walker.send_ack(have_ref, 'continue')
+            if self.walker.all_wants_satisfied(self._common):
+                self._found_base = True
+        # else we blind ack within next
+
+    def next(self):
+        command, sha = self.walker.read_proto_line()
+        if command is None:
+            self.walker.send_nak()
+            return None
+        elif command == 'done':
+            # don't nak unless no common commits were found, even if not
+            # everything is satisfied
+            if self._common:
+                self.walker.send_ack(self._common[-1])
+            else:
+                self.walker.send_nak()
+            return None
+        elif command == 'have':
+            if self._found_base:
+                # blind ack
+                self.walker.send_ack(sha, 'continue')
+            return sha
+
+
 class ReceivePackHandler(Handler):
     """Protocol handler for downloading a pack to the client."""
 
@@ -267,5 +441,3 @@ class TCPGitServer(SocketServer.TCPServer):
     def __init__(self, backend, listen_addr, port=TCP_GIT_PORT):
         self.backend = backend
         SocketServer.TCPServer.__init__(self, (listen_addr, port), TCPGitRequestHandler)
-
-

+ 13 - 2
dulwich/tests/test_protocol.py

@@ -26,6 +26,7 @@ from unittest import TestCase
 from dulwich.protocol import (
     Protocol,
     extract_capabilities,
+    extract_want_line_capabilities,
     )
 
 class ProtocolTests(TestCase):
@@ -80,7 +81,17 @@ class ProtocolTests(TestCase):
 class ExtractCapabilitiesTestCase(TestCase):
 
     def test_plain(self):
-        self.assertEquals(("bla", None), extract_capabilities("bla"))
+        self.assertEquals(("bla", []), extract_capabilities("bla"))
 
     def test_caps(self):
-        self.assertEquals(("bla", ["la", "la"]), extract_capabilities("bla\0la\0la"))
+        self.assertEquals(("bla", ["la"]), extract_capabilities("bla\0la"))
+        self.assertEquals(("bla", ["la"]), extract_capabilities("bla\0la\n"))
+        self.assertEquals(("bla", ["la", "la"]), extract_capabilities("bla\0la la"))
+
+    def test_plain_want_line(self):
+        self.assertEquals(("want bla", []), extract_want_line_capabilities("want bla"))
+
+    def test_caps_want_line(self):
+        self.assertEquals(("want bla", ["la"]), extract_want_line_capabilities("want bla la"))
+        self.assertEquals(("want bla", ["la"]), extract_want_line_capabilities("want bla la\n"))
+        self.assertEquals(("want bla", ["la", "la"]), extract_want_line_capabilities("want bla la la"))

+ 355 - 0
dulwich/tests/test_server.py

@@ -0,0 +1,355 @@
+# test_server.py -- Tests for the git server
+# Copyright (C) 2010 Google, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; version 2
+# or (at your option) any later version of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA  02110-1301, USA.
+
+
+"""Tests for the smart protocol server."""
+
+
+from cStringIO import StringIO
+from unittest import TestCase
+
+from dulwich.errors import (
+    GitProtocolError,
+    )
+from dulwich.server import (
+    UploadPackHandler,
+    ProtocolGraphWalker,
+    SingleAckGraphWalkerImpl,
+    MultiAckGraphWalkerImpl,
+    )
+
+from dulwich.protocol import (
+    SINGLE_ACK,
+    MULTI_ACK,
+    )
+
+ONE = '1' * 40
+TWO = '2' * 40
+THREE = '3' * 40
+FOUR = '4' * 40
+FIVE = '5' * 40
+
+class TestProto(object):
+    def __init__(self):
+        self._output = []
+        self._received = {0: [], 1: [], 2: [], 3: []}
+
+    def set_output(self, output_lines):
+        self._output = ['%s\n' % line.rstrip() for line in output_lines]
+
+    def read_pkt_line(self):
+        if self._output:
+            return self._output.pop(0)
+        else:
+            return None
+
+    def write_sideband(self, band, data):
+        self._received[band].append(data)
+
+    def write_pkt_line(self, data):
+        if data is None:
+            data = 'None'
+        self._received[0].append(data)
+
+    def get_received_line(self, band=0):
+        lines = self._received[band]
+        if lines:
+            return lines.pop(0)
+        else:
+            return None
+
+
+class UploadPackHandlerTestCase(TestCase):
+    def setUp(self):
+        self._handler = UploadPackHandler(None, None, None)
+
+    def test_set_client_capabilities(self):
+        try:
+            self._handler.set_client_capabilities([])
+        except GitProtocolError:
+            self.fail()
+
+        try:
+            self._handler.set_client_capabilities([
+                'multi_ack', 'side-band-64k', 'thin-pack', 'ofs-delta'])
+        except GitProtocolError:
+            self.fail()
+
+    def test_set_client_capabilities_error(self):
+        self.assertRaises(GitProtocolError,
+                          self._handler.set_client_capabilities,
+                          ['weird_ack_level', 'ofs-delta'])
+        try:
+            self._handler.set_client_capabilities(['include-tag'])
+        except GitProtocolError:
+            self.fail()
+
+
+class TestCommit(object):
+    def __init__(self, sha, parents, commit_time):
+        self.id = sha
+        self._parents = parents
+        self.commit_time = commit_time
+
+    def get_parents(self):
+        return self._parents
+
+    def __repr__(self):
+        return '%s(%s)' % (self.__class__.__name__, self._sha)
+
+
+class ProtocolGraphWalkerTestCase(TestCase):
+    def setUp(self):
+        # Create the following commit tree:
+        #   3---5
+        #  /
+        # 1---2---4
+        self._objects = {
+            ONE: TestCommit(ONE, [], 111),
+            TWO: TestCommit(TWO, [ONE], 222),
+            THREE: TestCommit(THREE, [ONE], 333),
+            FOUR: TestCommit(FOUR, [TWO], 444),
+            FIVE: TestCommit(FIVE, [THREE], 555),
+            }
+        self._walker = ProtocolGraphWalker(self._objects, None)
+
+    def test_is_satisfied_no_haves(self):
+        self.assertFalse(self._walker._is_satisfied([], ONE, 0))
+        self.assertFalse(self._walker._is_satisfied([], TWO, 0))
+        self.assertFalse(self._walker._is_satisfied([], THREE, 0))
+
+    def test_is_satisfied_have_root(self):
+        self.assertTrue(self._walker._is_satisfied([ONE], ONE, 0))
+        self.assertTrue(self._walker._is_satisfied([ONE], TWO, 0))
+        self.assertTrue(self._walker._is_satisfied([ONE], THREE, 0))
+
+    def test_is_satisfied_have_branch(self):
+        self.assertTrue(self._walker._is_satisfied([TWO], TWO, 0))
+        # wrong branch
+        self.assertFalse(self._walker._is_satisfied([TWO], THREE, 0))
+
+    def test_all_wants_satisfied(self):
+        self._walker.set_wants([FOUR, FIVE])
+        # trivial case: wants == haves
+        self.assertTrue(self._walker.all_wants_satisfied([FOUR, FIVE]))
+        # cases that require walking the commit tree
+        self.assertTrue(self._walker.all_wants_satisfied([ONE]))
+        self.assertFalse(self._walker.all_wants_satisfied([TWO]))
+        self.assertFalse(self._walker.all_wants_satisfied([THREE]))
+        self.assertTrue(self._walker.all_wants_satisfied([TWO, THREE]))
+
+    # TODO: test commit time cutoff
+
+
+class TestProtocolGraphWalker(object):
+    def __init__(self):
+        self.acks = []
+        self.lines = []
+        self.done = False
+
+    def read_proto_line(self):
+        return self.lines.pop(0)
+
+    def send_ack(self, sha, ack_type=''):
+        self.acks.append((sha, ack_type))
+
+    def send_nak(self):
+        self.acks.append((None, 'nak'))
+
+    def all_wants_satisfied(self, haves):
+        return self.done
+
+    def pop_ack(self):
+        if not self.acks:
+            return None
+        return self.acks.pop(0)
+
+
+class AckGraphWalkerImplTestCase(TestCase):
+    """Base setup and asserts for AckGraphWalker tests."""
+    def setUp(self):
+        self._walker = TestProtocolGraphWalker()
+        self._walker.lines = [
+            ('have', TWO),
+            ('have', ONE),
+            ('have', THREE),
+            ('done', None),
+            ]
+        self._impl = self.impl_cls(self._walker)
+
+    def assertNoAck(self):
+        self.assertEquals(None, self._walker.pop_ack())
+
+    def assertAck(self, sha, ack_type=''):
+        self.assertEquals((sha, ack_type), self._walker.pop_ack())
+        self.assertNoAck()
+
+    def assertNak(self):
+        self.assertAck(None, 'nak')
+
+    def assertNextEquals(self, sha):
+        self.assertEquals(sha, self._impl.next())
+
+
+class SingleAckGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
+    impl_cls = SingleAckGraphWalkerImpl
+
+    def test_single_ack(self):
+        self.assertNextEquals(TWO)
+        self.assertNoAck()
+
+        self.assertNextEquals(ONE)
+        self._walker.done = True
+        self._impl.ack(ONE)
+        self.assertAck(ONE)
+
+        self.assertNextEquals(THREE)
+        self._impl.ack(THREE)
+        self.assertNoAck()
+
+        self.assertNextEquals(None)
+        self.assertNoAck()
+
+    def test_single_ack_flush(self):
+        # same as ack test but ends with a flush-pkt instead of done
+        self._walker.lines[-1] = (None, None)
+
+        self.assertNextEquals(TWO)
+        self.assertNoAck()
+
+        self.assertNextEquals(ONE)
+        self._walker.done = True
+        self._impl.ack(ONE)
+        self.assertAck(ONE)
+
+        self.assertNextEquals(THREE)
+        self.assertNoAck()
+
+        self.assertNextEquals(None)
+        self.assertNoAck()
+
+    def test_single_ack_nak(self):
+        self.assertNextEquals(TWO)
+        self.assertNoAck()
+
+        self.assertNextEquals(ONE)
+        self.assertNoAck()
+
+        self.assertNextEquals(THREE)
+        self.assertNoAck()
+
+        self.assertNextEquals(None)
+        self.assertNak()
+
+    def test_single_ack_nak_flush(self):
+        # same as nak test but ends with a flush-pkt instead of done
+        self._walker.lines[-1] = (None, None)
+
+        self.assertNextEquals(TWO)
+        self.assertNoAck()
+
+        self.assertNextEquals(ONE)
+        self.assertNoAck()
+
+        self.assertNextEquals(THREE)
+        self.assertNoAck()
+
+        self.assertNextEquals(None)
+        self.assertNak()
+
+class MultiAckGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
+    impl_cls = MultiAckGraphWalkerImpl
+
+    def test_multi_ack(self):
+        self.assertNextEquals(TWO)
+        self.assertNoAck()
+
+        self.assertNextEquals(ONE)
+        self._walker.done = True
+        self._impl.ack(ONE)
+        self.assertAck(ONE, 'continue')
+
+        self.assertNextEquals(THREE)
+        self._impl.ack(THREE)
+        self.assertAck(THREE, 'continue')
+
+        self.assertNextEquals(None)
+        self.assertAck(THREE)
+
+    def test_multi_ack_partial(self):
+        self.assertNextEquals(TWO)
+        self.assertNoAck()
+
+        self.assertNextEquals(ONE)
+        self._impl.ack(ONE)
+        self.assertAck(ONE, 'continue')
+
+        self.assertNextEquals(THREE)
+        self.assertNoAck()
+
+        self.assertNextEquals(None)
+        # done, re-send ack of last common
+        self.assertAck(ONE)
+
+    def test_multi_ack_flush(self):
+        # same as ack test but ends with a flush-pkt instead of done
+        self._walker.lines[-1] = (None, None)
+
+        self.assertNextEquals(TWO)
+        self.assertNoAck()
+
+        self.assertNextEquals(ONE)
+        self._walker.done = True
+        self._impl.ack(ONE)
+        self.assertAck(ONE, 'continue')
+
+        self.assertNextEquals(THREE)
+        self._impl.ack(THREE)
+        self.assertAck(THREE, 'continue')
+
+        self.assertNextEquals(None)
+        self.assertNak()
+
+    def test_multi_ack_nak(self):
+        self.assertNextEquals(TWO)
+        self.assertNoAck()
+
+        self.assertNextEquals(ONE)
+        self.assertNoAck()
+
+        self.assertNextEquals(THREE)
+        self.assertNoAck()
+
+        self.assertNextEquals(None)
+        self.assertNak()
+
+    def test_multi_ack_nak_flush(self):
+        # same as nak test but ends with a flush-pkt instead of done
+        self._walker.lines[-1] = (None, None)
+
+        self.assertNextEquals(TWO)
+        self.assertNoAck()
+
+        self.assertNextEquals(ONE)
+        self.assertNoAck()
+
+        self.assertNextEquals(THREE)
+        self.assertNoAck()
+
+        self.assertNextEquals(None)
+        self.assertNak()