Deleted
Added
test_asgi_websockets.py (1780:73699f41c956) | test_asgi_websockets.py (1819:9344a39fb02e) |
---|---|
1import struct 2import time 3from distutils.version import LooseVersion 4 5import pytest 6from unit.applications.lang.python import TestApplicationPython 7from unit.applications.websockets import TestApplicationWebsocket 8from unit.option import option --- 16 unchanged lines hidden (view full) --- 25 26 def close_connection(self, sock): 27 assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' 28 29 self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) 30 31 self.check_close(sock) 32 | 1import struct 2import time 3from distutils.version import LooseVersion 4 5import pytest 6from unit.applications.lang.python import TestApplicationPython 7from unit.applications.websockets import TestApplicationWebsocket 8from unit.option import option --- 16 unchanged lines hidden (view full) --- 25 26 def close_connection(self, sock): 27 assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' 28 29 self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) 30 31 self.check_close(sock) 32 |
33 def check_close(self, sock, code=1000, no_close=False): 34 frame = self.ws.frame_read(sock) | 33 def check_close(self, sock, code=1000, no_close=False, frame=None): 34 if frame == None: 35 frame = self.ws.frame_read(sock) |
35 36 assert frame['fin'] == True, 'close fin' 37 assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode' 38 assert frame['code'] == code, 'close code' 39 40 if not no_close: 41 sock.close() 42 --- 882 unchanged lines hidden (view full) --- 925 # 5_15 926 927 _, sock, _ = self.ws.upgrade() 928 929 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) 930 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) 931 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=False) 932 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment4', fin=True) | 36 37 assert frame['fin'] == True, 'close fin' 38 assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode' 39 assert frame['code'] == code, 'close code' 40 41 if not no_close: 42 sock.close() 43 --- 882 unchanged lines hidden (view full) --- 926 # 5_15 927 928 _, sock, _ = self.ws.upgrade() 929 930 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) 931 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) 932 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=False) 933 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment4', fin=True) |
933 self.check_close(sock, 1002) | |
934 | 934 |
935 frame = self.ws.frame_read(sock) 936 937 if frame['opcode'] == self.ws.OP_TEXT: 938 self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') 939 frame = None 940 941 self.check_close(sock, 1002, frame=frame) 942 |
|
935 # 5_16 936 937 _, sock, _ = self.ws.upgrade() 938 939 for i in range(0, 2): 940 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment1', fin=False) 941 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment2', fin=False) 942 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=True) --- 525 unchanged lines hidden --- | 943 # 5_16 944 945 _, sock, _ = self.ws.upgrade() 946 947 for i in range(0, 2): 948 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment1', fin=False) 949 self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment2', fin=False) 950 self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=True) --- 525 unchanged lines hidden --- |