1import pytest 2from packaging import version 3 4from unit.applications.lang.python import ApplicationPython 5from unit.option import option 6 7prerequisites = { 8 'modules': {'python': lambda v: version.parse(v) >= version.parse('3.5')} 9} 10 11client = ApplicationPython(load_module='asgi') 12 13 14@pytest.fixture(autouse=True) 15def setup_method_fixture(): 16 path = f'{option.test_dir}/python/targets/' 17 18 assert 'success' in client.conf( 19 { 20 "listeners": {"*:8080": {"pass": "routes"}}, 21 "routes": [ 22 { 23 "match": {"uri": "/1"}, 24 "action": {"pass": "applications/targets/1"}, 25 }, 26 { 27 "match": {"uri": "/2"}, 28 "action": {"pass": "applications/targets/2"}, 29 }, 30 ], 31 "applications": { 32 "targets": { 33 "type": client.get_application_type(), 34 "processes": {"spare": 0}, 35 "working_directory": path, 36 "path": path, 37 "protocol": "asgi", 38 "targets": { 39 "1": { 40 "module": "asgi", 41 "callable": "application_200", 42 }, 43 "2": { 44 "module": "asgi", 45 "callable": "application_201", 46 }, 47 }, 48 } 49 }, 50 } 51 ) 52 53 54def conf_targets(targets): 55 assert 'success' in client.conf(targets, 'applications/targets/targets') 56 57 58def test_asgi_targets(): 59 assert client.get(url='/1')['status'] == 200 60 assert client.get(url='/2')['status'] == 201 61 62 63def test_asgi_targets_legacy(): 64 conf_targets( 65 { 66 "1": {"module": "asgi", "callable": "legacy_application_200"}, 67 "2": {"module": "asgi", "callable": "legacy_application_201"}, 68 } 69 ) 70 71 assert client.get(url='/1')['status'] == 200 72 assert client.get(url='/2')['status'] == 201 73 74 75def test_asgi_targets_mix(): 76 conf_targets( 77 { 78 "1": {"module": "asgi", "callable": "application_200"}, 79 "2": {"module": "asgi", "callable": "legacy_application_201"}, 80 } 81 ) 82 83 assert client.get(url='/1')['status'] == 200 84 assert client.get(url='/2')['status'] == 201 85 86 87def test_asgi_targets_broken(skip_alert): 88 skip_alert(r'Python failed to get "blah" from module') 89 90 conf_targets( 91 { 92 "1": {"module": "asgi", "callable": "application_200"}, 93 "2": {"module": "asgi", "callable": "blah"}, 94 } 95 ) 96 97 assert client.get(url='/1')['status'] != 200 98 99 100def test_asgi_targets_prefix(): 101 conf_targets( 102 { 103 "1": { 104 "module": "asgi", 105 "callable": "application_prefix", 106 "prefix": "/1/", 107 }, 108 "2": { 109 "module": "asgi", 110 "callable": "application_prefix", 111 "prefix": "/api", 112 }, 113 } 114 ) 115 client.conf( 116 [ 117 { 118 "match": {"uri": "/1*"}, 119 "action": {"pass": "applications/targets/1"}, 120 }, 121 { 122 "match": {"uri": "*"}, 123 "action": {"pass": "applications/targets/2"}, 124 }, 125 ], 126 "routes", 127 ) 128 129 def check_prefix(url, prefix): 130 resp = client.get(url=url) 131 assert resp['status'] == 200 132 assert resp['headers']['prefix'] == prefix 133 134 check_prefix('/1', '/1') 135 check_prefix('/11', 'NULL') 136 check_prefix('/1/', '/1') 137 check_prefix('/', 'NULL') 138 check_prefix('/ap', 'NULL') 139 check_prefix('/api', '/api') 140 check_prefix('/api/', '/api') 141 check_prefix('/api/test/', '/api') 142 check_prefix('/apis', 'NULL') 143 check_prefix('/apis/', 'NULL') 144