|
@@ -296,6 +296,7 @@ class Subscriber:
|
|
self.connection = connection
|
|
self.connection = connection
|
|
self.acl = {}
|
|
self.acl = {}
|
|
self.server = server
|
|
self.server = server
|
|
|
|
+ self.lock = asyncio.Lock()
|
|
|
|
|
|
# Set topics subscribed to
|
|
# Set topics subscribed to
|
|
self.topics = [x for x in request.path.split('/') if x]
|
|
self.topics = [x for x in request.path.split('/') if x]
|
|
@@ -351,7 +352,8 @@ class Subscriber:
|
|
js = b"%s\n" % json.dumps({"stillalive": time.time()}).encode('utf-8')
|
|
js = b"%s\n" % json.dumps({"stillalive": time.time()}).encode('utf-8')
|
|
if self.old_school:
|
|
if self.old_school:
|
|
js += b"\0"
|
|
js += b"\0"
|
|
- await self.connection.write(js)
|
|
|
|
|
|
+ async with self.lock:
|
|
|
|
+ await self.connection.write(js)
|
|
|
|
|
|
|
|
|
|
class Payload:
|
|
class Payload:
|
|
@@ -391,9 +393,11 @@ class Payload:
|
|
if all(el in self.topics for el in sub.topics):
|
|
if all(el in self.topics for el in sub.topics):
|
|
try:
|
|
try:
|
|
if sub.old_school:
|
|
if sub.old_school:
|
|
- await sub.connection.write(ojs)
|
|
|
|
|
|
+ async with sub.lock:
|
|
|
|
+ await sub.connection.write(ojs)
|
|
else:
|
|
else:
|
|
- await sub.connection.write(js)
|
|
|
|
|
|
+ async with sub.lock:
|
|
|
|
+ await sub.connection.write(js)
|
|
except Exception:
|
|
except Exception:
|
|
bad_subs.append(sub)
|
|
bad_subs.append(sub)
|
|
return bad_subs
|
|
return bad_subs
|