|
@@ -21,6 +21,9 @@
|
|
|
|
|
|
from StringIO import StringIO
|
|
|
|
|
|
+from dulwich.errors import (
|
|
|
+ HangupException,
|
|
|
+ )
|
|
|
from dulwich.protocol import (
|
|
|
Protocol,
|
|
|
ReceivableProtocol,
|
|
@@ -50,6 +53,24 @@ class BaseProtocolTests(object):
|
|
|
self.rin.seek(0)
|
|
|
self.assertEquals('cmd ', self.proto.read_pkt_line())
|
|
|
|
|
|
+ def test_eof(self):
|
|
|
+ self.rin.write('0000')
|
|
|
+ self.rin.seek(0)
|
|
|
+ self.assertFalse(self.proto.eof())
|
|
|
+ self.assertEquals(None, self.proto.read_pkt_line())
|
|
|
+ self.assertTrue(self.proto.eof())
|
|
|
+ self.assertRaises(HangupException, self.proto.read_pkt_line)
|
|
|
+
|
|
|
+ def test_unread_pkt_line(self):
|
|
|
+ self.rin.write('0007foo0000')
|
|
|
+ self.rin.seek(0)
|
|
|
+ self.assertEquals('foo', self.proto.read_pkt_line())
|
|
|
+ self.proto.unread_pkt_line('bar')
|
|
|
+ self.assertEquals('bar', self.proto.read_pkt_line())
|
|
|
+ self.assertEquals(None, self.proto.read_pkt_line())
|
|
|
+ self.proto.unread_pkt_line('baz1')
|
|
|
+ self.assertRaises(ValueError, self.proto.unread_pkt_line, 'baz2')
|
|
|
+
|
|
|
def test_read_pkt_seq(self):
|
|
|
self.rin.write('0008cmd 0005l0000')
|
|
|
self.rin.seek(0)
|
|
@@ -91,10 +112,14 @@ class ProtocolTests(BaseProtocolTests, TestCase):
|
|
|
class ReceivableStringIO(StringIO):
|
|
|
"""StringIO with socket-like recv semantics for testing."""
|
|
|
|
|
|
+ def __init__(self):
|
|
|
+ StringIO.__init__(self)
|
|
|
+ self.allow_read_past_eof = False
|
|
|
+
|
|
|
def recv(self, size):
|
|
|
# fail fast if no bytes are available; in a real socket, this would
|
|
|
# block forever
|
|
|
- if self.tell() == len(self.getvalue()):
|
|
|
+ if self.tell() == len(self.getvalue()) and not self.allow_read_past_eof:
|
|
|
raise AssertionError('Blocking read past end of socket')
|
|
|
if size == 1:
|
|
|
return self.read(1)
|
|
@@ -111,6 +136,13 @@ class ReceivableProtocolTests(BaseProtocolTests, TestCase):
|
|
|
self.proto = ReceivableProtocol(self.rin.recv, self.rout.write)
|
|
|
self.proto._rbufsize = 8
|
|
|
|
|
|
+ def test_eof(self):
|
|
|
+ # Allow blocking reads past EOF just for this test. The only parts of
|
|
|
+ # the protocol that might check for EOF do not depend on the recv()
|
|
|
+ # semantics anyway.
|
|
|
+ self.rin.allow_read_past_eof = True
|
|
|
+ BaseProtocolTests.test_eof(self)
|
|
|
+
|
|
|
def test_recv(self):
|
|
|
all_data = '1234567' * 10 # not a multiple of bufsize
|
|
|
self.rin.write(all_data)
|