1import re 2 3import pytest 4from unit.applications.lang.perl import TestApplicationPerl 5 6 7class TestPerlApplication(TestApplicationPerl): 8 prerequisites = {'modules': {'perl': 'all'}} 9 10 def test_perl_application(self): 11 self.load('variables') 12 13 body = 'Test body string.' 14 15 resp = self.post( 16 headers={ 17 'Host': 'localhost', 18 'Content-Type': 'text/html', 19 'Custom-Header': 'blah', 20 'Connection': 'close', 21 }, 22 body=body, 23 ) 24 25 assert resp['status'] == 200, 'status' 26 headers = resp['headers'] 27 header_server = headers.pop('Server') 28 assert re.search(r'Unit/[\d\.]+', header_server), 'server header' 29 assert ( 30 headers.pop('Server-Software') == header_server 31 ), 'server software header' 32 33 date = headers.pop('Date') 34 assert date[-4:] == ' GMT', 'date header timezone' 35 assert ( 36 abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5 37 ), 'date header' 38 39 assert headers == { 40 'Connection': 'close', 41 'Content-Length': str(len(body)), 42 'Content-Type': 'text/html', 43 'Request-Method': 'POST', 44 'Request-Uri': '/', 45 'Http-Host': 'localhost', 46 'Server-Protocol': 'HTTP/1.1', 47 'Custom-Header': 'blah', 48 'Psgi-Version': '11', 49 'Psgi-Url-Scheme': 'http', 50 'Psgi-Multithread': '', 51 'Psgi-Multiprocess': '1', 52 'Psgi-Run-Once': '', 53 'Psgi-Nonblocking': '', 54 'Psgi-Streaming': '1', 55 }, 'headers' 56 assert resp['body'] == body, 'body' 57 58 def test_perl_application_query_string(self): 59 self.load('query_string') 60 61 resp = self.get(url='/?var1=val1&var2=val2') 62 63 assert ( 64 resp['headers']['Query-String'] == 'var1=val1&var2=val2' 65 ), 'Query-String header' 66 67 def test_perl_application_query_string_empty(self): 68 self.load('query_string') 69 70 resp = self.get(url='/?') 71 72 assert resp['status'] == 200, 'query string empty status' 73 assert resp['headers']['Query-String'] == '', 'query string empty' 74 75 def test_perl_application_query_string_absent(self): 76 self.load('query_string') 77 78 resp = self.get() 79 80 assert resp['status'] == 200, 'query string absent status' 81 assert resp['headers']['Query-String'] == '', 'query string absent' 82 83 @pytest.mark.skip('not yet') 84 def test_perl_application_server_port(self): 85 self.load('server_port') 86 87 assert ( 88 self.get()['headers']['Server-Port'] == '7080' 89 ), 'Server-Port header' 90 91 def test_perl_application_input_read_empty(self): 92 self.load('input_read_empty') 93 94 assert self.get()['body'] == '', 'read empty' 95 96 def test_perl_application_input_read_parts(self): 97 self.load('input_read_parts') 98 99 assert ( 100 self.post(body='0123456789')['body'] == '0123456789' 101 ), 'input read parts' 102 103 @pytest.mark.skip('not yet') 104 def test_perl_application_input_read_offset(self): 105 self.load('input_read_offset') 106 107 assert self.post(body='0123456789')['body'] == '4567', 'read offset' 108 109 def test_perl_application_input_copy(self): 110 self.load('input_copy') 111 112 body = '0123456789' 113 assert self.post(body=body)['body'] == body, 'input copy' 114 115 def test_perl_application_errors_print(self): 116 self.load('errors_print') 117 118 assert self.get()['body'] == '1', 'errors result' 119 120 assert ( 121 self.wait_for_record(r'\[error\].+Error in application') 122 is not None 123 ), 'errors print' 124 125 def test_perl_application_header_equal_names(self): 126 self.load('header_equal_names') 127 128 assert self.get()['headers']['Set-Cookie'] == [ 129 'tc=one,two,three', 130 'tc=four,five,six', 131 ], 'header equal names' 132 133 def test_perl_application_header_pairs(self): 134 self.load('header_pairs') 135 136 assert self.get()['headers']['blah'] == 'blah', 'header pairs' 137 138 def test_perl_application_body_empty(self): 139 self.load('body_empty') 140 141 assert self.get()['body'] == '', 'body empty' 142 143 def test_perl_application_body_array(self): 144 self.load('body_array') 145 146 assert self.get()['body'] == '0123456789', 'body array' 147 148 def test_perl_application_body_large(self): 149 self.load('variables') 150 151 body = '0123456789' * 1000 152 153 resp = self.post(body=body)['body'] 154 155 assert resp == body, 'body large' 156 157 def test_perl_application_body_io_empty(self): 158 self.load('body_io_empty') 159 160 assert self.get()['status'] == 200, 'body io empty' 161 162 def test_perl_application_body_io_file(self): 163 self.load('body_io_file') 164 165 assert self.get()['body'] == 'body\n', 'body io file' 166 167 @pytest.mark.skip('not yet') 168 def test_perl_application_syntax_error(self, skip_alert): 169 skip_alert(r'PSGI: Failed to parse script') 170 self.load('syntax_error') 171 172 assert self.get()['status'] == 500, 'syntax error' 173 174 def test_perl_keepalive_body(self): 175 self.load('variables') 176 177 assert self.get()['status'] == 200, 'init' 178 179 body = '0123456789' * 500 180 (resp, sock) = self.post( 181 headers={ 182 'Host': 'localhost', 183 'Connection': 'keep-alive', 184 'Content-Type': 'text/html', 185 }, 186 start=True, 187 body=body, 188 read_timeout=1, 189 ) 190 191 assert resp['body'] == body, 'keep-alive 1' 192 193 body = '0123456789' 194 resp = self.post( 195 headers={ 196 'Host': 'localhost', 197 'Connection': 'close', 198 'Content-Type': 'text/html', 199 }, 200 sock=sock, 201 body=body, 202 ) 203 204 assert resp['body'] == body, 'keep-alive 2' 205 206 def test_perl_body_io_fake(self): 207 self.load('body_io_fake') 208 209 assert self.get()['body'] == '21', 'body io fake' 210 211 assert ( 212 self.wait_for_record(r'\[error\].+IOFake getline\(\) \$\/ is \d+') 213 is not None 214 ), 'body io fake $/ value' 215 216 assert ( 217 self.wait_for_record(r'\[error\].+IOFake close\(\) called') 218 is not None 219 ), 'body io fake close' 220 221 def test_perl_delayed_response(self): 222 self.load('delayed_response') 223 224 resp = self.get() 225 226 assert resp['status'] == 200, 'status' 227 assert resp['body'] == 'Hello World!', 'body' 228 229 def test_perl_streaming_body(self): 230 self.load('streaming_body') 231 232 resp = self.get() 233 234 assert resp['status'] == 200, 'status' 235 assert resp['body'] == 'Hello World!', 'body' 236 237 def test_perl_application_threads(self): 238 self.load('threads') 239 240 assert 'success' in self.conf( 241 '4', 'applications/threads/threads' 242 ), 'configure 4 threads' 243 244 socks = [] 245 246 for i in range(4): 247 (_, sock) = self.get( 248 headers={ 249 'Host': 'localhost', 250 'X-Delay': '2', 251 'Connection': 'close', 252 }, 253 no_recv=True, 254 start=True, 255 ) 256 257 socks.append(sock) 258 259 threads = set() 260 261 for sock in socks: 262 resp = self.recvall(sock).decode('utf-8') 263 264 self.log_in(resp) 265 266 resp = self._resp_to_dict(resp) 267 268 assert resp['status'] == 200, 'status' 269 270 threads.add(resp['headers']['X-Thread']) 271 272 assert resp['headers']['Psgi-Multithread'] == '1', 'multithread' 273 274 sock.close() 275 276 assert len(socks) == len(threads), 'threads differs' 277