1import unittest 2from unit.applications.lang.perl import TestApplicationPerl 3 4 5class TestPerlApplication(TestApplicationPerl): 6 prerequisites = {'modules': ['perl']} 7 8 def test_perl_application(self): 9 self.load('variables') 10 11 body = 'Test body string.' 12 13 resp = self.post( 14 headers={ 15 'Host': 'localhost', 16 'Content-Type': 'text/html', 17 'Custom-Header': 'blah', 18 'Connection': 'close', 19 }, 20 body=body, 21 ) 22 23 self.assertEqual(resp['status'], 200, 'status') 24 headers = resp['headers'] 25 header_server = headers.pop('Server') 26 self.assertRegex(header_server, r'Unit/[\d\.]+', 'server header') 27 self.assertEqual( 28 headers.pop('Server-Software'), 29 header_server, 30 'server software header', 31 ) 32 33 date = headers.pop('Date') 34 self.assertEqual(date[-4:], ' GMT', 'date header timezone') 35 self.assertLess( 36 abs(self.date_to_sec_epoch(date) - self.sec_epoch()), 37 5, 38 'date header', 39 ) 40 41 self.assertDictEqual( 42 headers, 43 { 44 'Connection': 'close', 45 'Content-Length': str(len(body)), 46 'Content-Type': 'text/html', 47 'Request-Method': 'POST', 48 'Request-Uri': '/', 49 'Http-Host': 'localhost', 50 'Server-Protocol': 'HTTP/1.1', 51 'Custom-Header': 'blah', 52 'Psgi-Version': '11', 53 'Psgi-Url-Scheme': 'http', 54 'Psgi-Multithread': '', 55 'Psgi-Multiprocess': '1', 56 'Psgi-Run-Once': '', 57 'Psgi-Nonblocking': '', 58 'Psgi-Streaming': '1', 59 }, 60 'headers', 61 ) 62 self.assertEqual(resp['body'], body, 'body') 63 64 def test_perl_application_query_string(self): 65 self.load('query_string') 66 67 resp = self.get(url='/?var1=val1&var2=val2') 68 69 self.assertEqual( 70 resp['headers']['Query-String'], 71 'var1=val1&var2=val2', 72 'Query-String header', 73 ) 74 75 def test_perl_application_query_string_empty(self): 76 self.load('query_string') 77 78 resp = self.get(url='/?') 79 80 self.assertEqual(resp['status'], 200, 'query string empty status') 81 self.assertEqual( 82 resp['headers']['Query-String'], '', 'query string empty' 83 ) 84 85 def test_perl_application_query_string_absent(self): 86 self.load('query_string') 87 88 resp = self.get() 89 90 self.assertEqual(resp['status'], 200, 'query string absent status') 91 self.assertEqual( 92 resp['headers']['Query-String'], '', 'query string absent' 93 ) 94 95 @unittest.skip('not yet') 96 def test_perl_application_server_port(self): 97 self.load('server_port') 98 99 self.assertEqual( 100 self.get()['headers']['Server-Port'], '7080', 'Server-Port header' 101 ) 102 103 def test_perl_application_input_read_empty(self): 104 self.load('input_read_empty') 105 106 self.assertEqual(self.get()['body'], '', 'read empty') 107 108 def test_perl_application_input_read_parts(self): 109 self.load('input_read_parts') 110 111 self.assertEqual( 112 self.post(body='0123456789')['body'], 113 '0123456789', 114 'input read parts', 115 ) 116 117 @unittest.skip('not yet') 118 def test_perl_application_input_read_offset(self): 119 self.load('input_read_offset') 120 121 self.assertEqual( 122 self.post(body='0123456789')['body'], '4567', 'read offset' 123 ) 124 125 def test_perl_application_input_copy(self): 126 self.load('input_copy') 127 128 body = '0123456789' 129 self.assertEqual(self.post(body=body)['body'], body, 'input copy') 130 131 def test_perl_application_errors_print(self): 132 self.load('errors_print') 133 134 self.assertEqual(self.get()['body'], '1', 'errors result') 135 136 self.stop() 137 138 self.assertIsNotNone( 139 self.wait_for_record(r'\[error\].+Error in application'), 140 'errors print', 141 ) 142 143 def test_perl_application_header_equal_names(self): 144 self.load('header_equal_names') 145 146 self.assertListEqual( 147 self.get()['headers']['Set-Cookie'], 148 ['tc=one,two,three', 'tc=four,five,six'], 149 'header equal names', 150 ) 151 152 def test_perl_application_header_pairs(self): 153 self.load('header_pairs') 154 155 self.assertEqual(self.get()['headers']['blah'], 'blah', 'header pairs') 156 157 def test_perl_application_body_empty(self): 158 self.load('body_empty') 159 160 self.assertEqual(self.get()['body'], '', 'body empty') 161 162 def test_perl_application_body_array(self): 163 self.load('body_array') 164 165 self.assertEqual(self.get()['body'], '0123456789', 'body array') 166 167 def test_perl_application_body_large(self): 168 self.load('variables') 169 170 body = '0123456789' * 1000 171 172 resp = self.post(body=body)['body'] 173 174 self.assertEqual(resp, body, 'body large') 175 176 def test_perl_application_body_io_empty(self): 177 self.load('body_io_empty') 178 179 self.assertEqual(self.get()['status'], 200, 'body io empty') 180 181 def test_perl_application_body_io_file(self): 182 self.load('body_io_file') 183 184 self.assertEqual(self.get()['body'], 'body\n', 'body io file') 185 186 @unittest.skip('not yet, unsafe') 187 def test_perl_application_syntax_error(self): 188 self.skip_alerts.extend( 189 [r'PSGI: Failed to parse script'] 190 ) 191 self.load('syntax_error') 192 193 self.assertEqual(self.get()['status'], 500, 'syntax error') 194 195 def test_perl_keepalive_body(self): 196 self.load('variables') 197 198 self.assertEqual(self.get()['status'], 200, 'init') 199 200 (resp, sock) = self.post( 201 headers={ 202 'Host': 'localhost', 203 'Connection': 'keep-alive', 204 'Content-Type': 'text/html', 205 }, 206 start=True, 207 body='0123456789' * 500, 208 read_timeout=1, 209 ) 210 211 self.assertEqual(resp['body'], '0123456789' * 500, 'keep-alive 1') 212 213 resp = self.post( 214 headers={ 215 'Host': 'localhost', 216 'Connection': 'close', 217 'Content-Type': 'text/html', 218 }, 219 sock=sock, 220 body='0123456789', 221 ) 222 223 self.assertEqual(resp['body'], '0123456789', 'keep-alive 2') 224 225 def test_perl_body_io_fake(self): 226 self.load('body_io_fake') 227 228 self.assertEqual(self.get()['body'], '21', 'body io fake') 229 230 self.assertIsNotNone( 231 self.wait_for_record(r'\[error\].+IOFake getline\(\) \$\/ is \d+'), 232 'body io fake $/ value', 233 ) 234 235 self.assertIsNotNone( 236 self.wait_for_record(r'\[error\].+IOFake close\(\) called'), 237 'body io fake close', 238 ) 239 240 def test_perl_delayed_response(self): 241 self.load('delayed_response') 242 243 resp = self.get() 244 245 self.assertEqual(resp['status'], 200, 'status') 246 self.assertEqual(resp['body'], 'Hello World!', 'body') 247 248 def test_perl_streaming_body(self): 249 self.load('streaming_body') 250 251 resp = self.get() 252 253 self.assertEqual(resp['status'], 200, 'status') 254 self.assertEqual(resp['body'], 'Hello World!', 'body') 255 256 257if __name__ == '__main__': 258 TestPerlApplication.main() 259