test_asgi_websockets.py (1780:73699f41c956) test_asgi_websockets.py (1819:9344a39fb02e)
1import struct
2import time
3from distutils.version import LooseVersion
4
5import pytest
6from unit.applications.lang.python import TestApplicationPython
7from unit.applications.websockets import TestApplicationWebsocket
8from unit.option import option

--- 16 unchanged lines hidden (view full) ---

25
26 def close_connection(self, sock):
27 assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc'
28
29 self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close())
30
31 self.check_close(sock)
32
1import struct
2import time
3from distutils.version import LooseVersion
4
5import pytest
6from unit.applications.lang.python import TestApplicationPython
7from unit.applications.websockets import TestApplicationWebsocket
8from unit.option import option

--- 16 unchanged lines hidden (view full) ---

25
26 def close_connection(self, sock):
27 assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc'
28
29 self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close())
30
31 self.check_close(sock)
32
33 def check_close(self, sock, code=1000, no_close=False):
34 frame = self.ws.frame_read(sock)
33 def check_close(self, sock, code=1000, no_close=False, frame=None):
34 if frame == None:
35 frame = self.ws.frame_read(sock)
35
36 assert frame['fin'] == True, 'close fin'
37 assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode'
38 assert frame['code'] == code, 'close code'
39
40 if not no_close:
41 sock.close()
42

--- 882 unchanged lines hidden (view full) ---

925 # 5_15
926
927 _, sock, _ = self.ws.upgrade()
928
929 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False)
930 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True)
931 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=False)
932 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment4', fin=True)
36
37 assert frame['fin'] == True, 'close fin'
38 assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode'
39 assert frame['code'] == code, 'close code'
40
41 if not no_close:
42 sock.close()
43

--- 882 unchanged lines hidden (view full) ---

926 # 5_15
927
928 _, sock, _ = self.ws.upgrade()
929
930 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False)
931 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True)
932 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=False)
933 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment4', fin=True)
933 self.check_close(sock, 1002)
934
934
935 frame = self.ws.frame_read(sock)
936
937 if frame['opcode'] == self.ws.OP_TEXT:
938 self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2')
939 frame = None
940
941 self.check_close(sock, 1002, frame=frame)
942
935 # 5_16
936
937 _, sock, _ = self.ws.upgrade()
938
939 for i in range(0, 2):
940 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment1', fin=False)
941 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment2', fin=False)
942 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=True)

--- 525 unchanged lines hidden ---
943 # 5_16
944
945 _, sock, _ = self.ws.upgrade()
946
947 for i in range(0, 2):
948 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment1', fin=False)
949 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment2', fin=False)
950 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=True)

--- 525 unchanged lines hidden ---