1import unittest 2from unit.applications.lang.perl import TestApplicationPerl 3 4 5class TestPerlApplication(TestApplicationPerl): 6 prerequisites = ['perl'] 7 8 def test_perl_application(self): 9 self.load('variables') 10 11 body = 'Test body string.' 12 13 resp = self.post( 14 headers={ 15 'Host': 'localhost', 16 'Content-Type': 'text/html', 17 'Custom-Header': 'blah', 18 'Connection': 'close', 19 }, 20 body=body, 21 ) 22 23 self.assertEqual(resp['status'], 200, 'status') 24 headers = resp['headers'] 25 header_server = headers.pop('Server') 26 self.assertRegex(header_server, r'Unit/[\d\.]+', 'server header') 27 self.assertEqual( 28 headers.pop('Server-Software'), 29 header_server, 30 'server software header', 31 ) 32 33 date = headers.pop('Date') 34 self.assertEqual(date[-4:], ' GMT', 'date header timezone') 35 self.assertLess( 36 abs(self.date_to_sec_epoch(date) - self.sec_epoch()), 37 5, 38 'date header', 39 ) 40 41 self.assertDictEqual( 42 headers, 43 { 44 'Connection': 'close', 45 'Content-Length': str(len(body)), 46 'Content-Type': 'text/html', 47 'Request-Method': 'POST', 48 'Request-Uri': '/', 49 'Http-Host': 'localhost', 50 'Server-Protocol': 'HTTP/1.1', 51 'Custom-Header': 'blah', 52 'Psgi-Version': '11', 53 'Psgi-Url-Scheme': 'http', 54 'Psgi-Multithread': '', 55 'Psgi-Multiprocess': '1', 56 'Psgi-Run-Once': '', 57 'Psgi-Nonblocking': '', 58 'Psgi-Streaming': '1', 59 }, 60 'headers', 61 ) 62 self.assertEqual(resp['body'], body, 'body') 63 64 def test_perl_application_query_string(self): 65 self.load('query_string') 66 67 resp = self.get(url='/?var1=val1&var2=val2') 68 69 self.assertEqual( 70 resp['headers']['Query-String'], 71 'var1=val1&var2=val2', 72 'Query-String header', 73 ) 74 75 def test_perl_application_query_string_empty(self): 76 self.load('query_string') 77 78 resp = self.get(url='/?') 79 80 self.assertEqual(resp['status'], 200, 'query string empty status') 81 self.assertEqual( 82 resp['headers']['Query-String'], '', 'query string empty' 83 ) 84 85 @unittest.expectedFailure 86 def test_perl_application_query_string_absent(self): 87 self.load('query_string') 88 89 resp = self.get() 90 91 self.assertEqual(resp['status'], 200, 'query string absent status') 92 self.assertEqual( 93 resp['headers']['Query-String'], '', 'query string absent' 94 ) 95 96 @unittest.expectedFailure 97 def test_perl_application_server_port(self): 98 self.load('server_port') 99 100 self.assertEqual( 101 self.get()['headers']['Server-Port'], '7080', 'Server-Port header' 102 ) 103 104 def test_perl_application_input_read_empty(self): 105 self.load('input_read_empty') 106 107 self.assertEqual(self.get()['body'], '', 'read empty') 108 109 def test_perl_application_input_read_parts(self): 110 self.load('input_read_parts') 111 112 self.assertEqual( 113 self.post(body='0123456789')['body'], 114 '0123456789', 115 'input read parts', 116 ) 117 118 @unittest.expectedFailure 119 def test_perl_application_input_read_offset(self): 120 self.load('input_read_offset') 121 122 self.assertEqual( 123 self.post(body='0123456789')['body'], '4567', 'read offset' 124 ) 125 126 def test_perl_application_input_copy(self): 127 self.load('input_copy') 128 129 body = '0123456789' 130 self.assertEqual(self.post(body=body)['body'], body, 'input copy') 131 132 def test_perl_application_errors_print(self): 133 self.load('errors_print') 134 135 self.assertEqual(self.get()['body'], '1', 'errors result') 136 137 self.stop() 138 139 self.assertIsNotNone( 140 self.search_in_log(r'\[error\].+Error in application'), 141 'errors print', 142 ) 143 144 def test_perl_application_header_equal_names(self): 145 self.load('header_equal_names') 146 147 self.assertListEqual( 148 self.get()['headers']['Set-Cookie'], 149 ['tc=one,two,three', 'tc=four,five,six'], 150 'header equal names', 151 ) 152 153 def test_perl_application_header_pairs(self): 154 self.load('header_pairs') 155 156 self.assertEqual(self.get()['headers']['blah'], 'blah', 'header pairs') 157 158 def test_perl_application_body_empty(self): 159 self.load('body_empty') 160 161 self.assertEqual(self.get()['body'], '0\r\n\r\n', 'body empty') 162 163 def test_perl_application_body_array(self): 164 self.load('body_array') 165 166 self.assertEqual(self.get()['body'], '0123456789', 'body array') 167 168 def test_perl_application_body_large(self): 169 self.load('variables') 170 171 body = '0123456789' * 1000 172 173 resp = self.post(body=body)['body'] 174 175 self.assertEqual(resp, body, 'body large') 176 177 def test_perl_application_body_io_empty(self): 178 self.load('body_io_empty') 179 180 self.assertEqual(self.get()['status'], 200, 'body io empty') 181 182 def test_perl_application_body_io_file(self): 183 self.load('body_io_file') 184 185 self.assertEqual(self.get()['body'], 'body\n', 'body io file') 186 187 @unittest.expectedFailure 188 def test_perl_application_syntax_error(self): 189 self.skip_alerts.extend( 190 [r'PSGI: Failed to parse script', r'process \d+ exited on signal'] 191 ) 192 self.load('syntax_error') 193 194 self.assertEqual(self.get()['status'], 500, 'syntax error') 195 196 def test_perl_keepalive_body(self): 197 self.load('variables') 198 199 (resp, sock) = self.post( 200 headers={ 201 'Host': 'localhost', 202 'Connection': 'keep-alive', 203 'Content-Type': 'text/html', 204 }, 205 start=True, 206 body='0123456789' * 500, 207 ) 208 209 self.assertEqual(resp['body'], '0123456789' * 500, 'keep-alive 1') 210 211 resp = self.post( 212 headers={ 213 'Host': 'localhost', 214 'Connection': 'close', 215 'Content-Type': 'text/html', 216 }, 217 sock=sock, 218 body='0123456789', 219 ) 220 221 self.assertEqual(resp['body'], '0123456789', 'keep-alive 2') 222 223 def test_perl_body_io_fake(self): 224 self.load('body_io_fake') 225 226 self.assertEqual(self.get()['body'], '21', 'body io fake') 227 228 self.assertIsNotNone( 229 self.search_in_log(r'\[error\].+IOFake getline\(\) \$\/ is \d+'), 230 'body io fake $/ value', 231 ) 232 233 self.assertIsNotNone( 234 self.search_in_log(r'\[error\].+IOFake close\(\) called'), 235 'body io fake close', 236 ) 237 238 def test_perl_delayed_response(self): 239 self.load('delayed_response') 240 241 resp = self.get() 242 243 self.assertEqual(resp['status'], 200, 'status') 244 self.assertEqual(resp['body'], 'Hello World!', 'body') 245 246 def test_perl_streaming_body(self): 247 self.load('streaming_body') 248 249 resp = self.get() 250 251 self.assertEqual(resp['status'], 200, 'status') 252 self.assertEqual(resp['body'], 'Hello World!', 'body') 253 254 255if __name__ == '__main__': 256 TestPerlApplication.main() 257