nxt_application.c (206:86a529b2ea9b) nxt_application.c (216:07257705cd64)
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) Igor Sysoev
5 * Copyright (C) Valentin V. Bartenev
6 * Copyright (C) NGINX, Inc.
7 */
8
9#include <nxt_main.h>
10#include <nxt_runtime.h>
11#include <nxt_application.h>
12#include <nxt_master_process.h>
13
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) Igor Sysoev
5 * Copyright (C) Valentin V. Bartenev
6 * Copyright (C) NGINX, Inc.
7 */
8
9#include <nxt_main.h>
10#include <nxt_runtime.h>
11#include <nxt_application.h>
12#include <nxt_master_process.h>
13
14#include <glob.h>
14
15
15nxt_application_module_t *nxt_app_modules[NXT_APP_MAX];
16
16
17typedef struct {
18 nxt_str_t type;
19 nxt_str_t version;
20 nxt_str_t file;
21} nxt_module_t;
22
23
24static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
25static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
26 nxt_array_t *modules, const char *name);
27static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
28 const char *name);
29
30
17static nxt_thread_mutex_t nxt_app_mutex;
18static nxt_thread_cond_t nxt_app_cond;
19
20static nxt_http_fields_hash_entry_t nxt_app_request_fields[];
21static nxt_http_fields_hash_t *nxt_app_request_fields_hash;
22
23static nxt_application_module_t *nxt_app;
24
31static nxt_thread_mutex_t nxt_app_mutex;
32static nxt_thread_cond_t nxt_app_cond;
33
34static nxt_http_fields_hash_entry_t nxt_app_request_fields[];
35static nxt_http_fields_hash_t *nxt_app_request_fields_hash;
36
37static nxt_application_module_t *nxt_app;
38
39
25nxt_int_t
40nxt_int_t
41nxt_discovery_start(nxt_task_t *task, void *data)
42{
43 nxt_buf_t *b;
44 nxt_port_t *main_port;
45 nxt_runtime_t *rt;
46
47 nxt_debug(task, "DISCOVERY");
48
49 b = nxt_discovery_modules(task, "build/nginext.*");
50
51 rt = task->thread->runtime;
52 main_port = rt->port_by_type[NXT_PROCESS_MASTER];
53
54 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
55 0, -1, b);
56
57 return NXT_OK;
58}
59
60
61static void
62nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
63{
64 nxt_mp_t *mp;
65 nxt_buf_t *b;
66
67 b = obj;
68 mp = b->data;
69
70 nxt_mp_destroy(mp);
71
72 exit(0);
73}
74
75
76static nxt_buf_t *
77nxt_discovery_modules(nxt_task_t *task, const char *path)
78{
79 char *name;
80 u_char *p, *end;
81 size_t size;
82 glob_t glb;
83 nxt_mp_t *mp;
84 nxt_buf_t *b;
85 nxt_int_t ret;
86 nxt_uint_t i, n;
87 nxt_array_t *modules;
88 nxt_module_t *module;
89
90 b = NULL;
91
92 mp = nxt_mp_create(1024, 128, 256, 32);
93 if (mp == NULL) {
94 return b;
95 }
96
97 ret = glob(path, 0, NULL, &glb);
98
99 if (ret == 0) {
100 n = glb.gl_pathc;
101
102 modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
103 if (modules == NULL) {
104 goto fail;
105 }
106
107 for (i = 0; i < n; i++) {
108 name = glb.gl_pathv[i];
109
110 ret = nxt_discovery_module(task, mp, modules, name);
111 if (ret != NXT_OK) {
112 goto fail;
113 }
114 }
115
116 size = sizeof("[]") - 1;
117 module = modules->elts;
118 n = modules->nelts;
119
120 for (i = 0; i < n; i++) {
121 nxt_debug(task, "module: %V %V %V",
122 &module[i].type, &module[i].version, &module[i].file);
123
124 size += sizeof("{\"type\": \"\",") - 1;
125 size += sizeof(" \"version\": \"\",") - 1;
126 size += sizeof(" \"file\": \"\"},") - 1;
127
128 size += module[i].type.length
129 + module[i].version.length
130 + module[i].file.length;
131 }
132
133 b = nxt_buf_mem_alloc(mp, size, 0);
134 if (b == NULL) {
135 goto fail;
136 }
137
138 b->completion_handler = nxt_discovery_completion_handler;
139
140 p = b->mem.free;
141 end = b->mem.end;
142 *p++ = '[';
143
144 for (i = 0; i < n; i++) {
145 p = nxt_sprintf(p, end,
146 "{\"type\": \"%V\", \"version\": \"%V\", \"file\": \"%V\"},",
147 &module[i].type, &module[i].version, &module[i].file);
148 }
149
150 *p++ = ']';
151 b->mem.free = p;
152 }
153
154fail:
155
156 globfree(&glb);
157
158 return b;
159}
160
161
162static nxt_int_t
163nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
164 const char *name)
165{
166 void *dl;
167 nxt_str_t *s;
168 nxt_int_t ret;
169 nxt_uint_t i, n;
170 nxt_module_t *module;
171 nxt_application_module_t *app;
172
173 /*
174 * Only memory allocation failure should return NXT_ERROR.
175 * Any module processing errors are ignored.
176 */
177 ret = NXT_ERROR;
178
179 dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
180
181 if (dl == NULL) {
182 nxt_log(task, NXT_LOG_CRIT, "dlopen(\"%s\"), failed: \"%s\"",
183 name, dlerror());
184 return NXT_OK;
185 }
186
187 app = dlsym(dl, "nxt_app_module");
188
189 if (app != NULL) {
190 nxt_log(task, NXT_LOG_NOTICE, "module: %V \"%s\"",
191 &app->version, name);
192
193 module = modules->elts;
194 n = modules->nelts;
195
196 for (i = 0; i < n; i++) {
197 if (nxt_strstr_eq(&app->version, &module[i].version)) {
198 nxt_log(task, NXT_LOG_NOTICE,
199 "ignoring %s module with the same "
200 "application language version %V as in %s",
201 name, &module[i].version, &module[i].file);
202
203 goto done;
204 }
205 }
206
207 module = nxt_array_add(modules);
208 if (module == NULL) {
209 goto fail;
210 }
211
212 s = nxt_str_dup(mp, &module->type, &app->type);
213 if (s == NULL) {
214 goto fail;
215 }
216
217 s = nxt_str_dup(mp, &module->version, &app->version);
218 if (s == NULL) {
219 goto fail;
220 }
221
222 module->file.length = nxt_strlen(name);
223
224 module->file.start = nxt_mp_alloc(mp, module->file.length);
225 if (module->file.start == NULL) {
226 goto fail;
227 }
228
229 nxt_memcpy(module->file.start, name, module->file.length);
230
231 } else {
232 nxt_log(task, NXT_LOG_CRIT, "dlsym(\"%s\"), failed: \"%s\"",
233 name, dlerror());
234 }
235
236done:
237
238 ret = NXT_OK;
239
240fail:
241
242 if (dlclose(dl) != 0) {
243 nxt_log(task, NXT_LOG_CRIT, "dlclose(\"%s\"), failed: \"%s\"",
244 name, dlerror());
245 }
246
247 return ret;
248}
249
250
251nxt_int_t
26nxt_app_start(nxt_task_t *task, void *data)
27{
252nxt_app_start(nxt_task_t *task, void *data)
253{
28 nxt_int_t ret;
29 nxt_common_app_conf_t *app_conf;
254 nxt_int_t ret;
255 nxt_app_lang_module_t *lang;
256 nxt_common_app_conf_t *app_conf;
30
31 app_conf = data;
32
257
258 app_conf = data;
259
260 lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
261 if (nxt_slow_path(lang == NULL)) {
262 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
263 &app_conf->type);
264 return NXT_ERROR;
265 }
266
267 nxt_app = lang->module;
268
269 if (nxt_app == NULL) {
270 nxt_debug(task, "application language module: %V \"%s\"",
271 &lang->version, lang->file);
272
273 nxt_app = nxt_app_module_load(task, lang->file);
274 }
275
33 if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
34 return NXT_ERROR;
35 }
36
37 if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) {
38 return NXT_ERROR;
39 }
40
276 if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
277 return NXT_ERROR;
278 }
279
280 if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) {
281 return NXT_ERROR;
282 }
283
41 nxt_app = nxt_app_modules[app_conf->type_id];
42
43 ret = nxt_app->init(task, data);
44
45 if (nxt_slow_path(ret != NXT_OK)) {
46 nxt_debug(task, "application init failed");
47
48 } else {
49 nxt_debug(task, "application init done");
50 }
51
52 return ret;
53}
54
55
284 ret = nxt_app->init(task, data);
285
286 if (nxt_slow_path(ret != NXT_OK)) {
287 nxt_debug(task, "application init failed");
288
289 } else {
290 nxt_debug(task, "application init done");
291 }
292
293 return ret;
294}
295
296
297static nxt_app_module_t *
298nxt_app_module_load(nxt_task_t *task, const char *name)
299{
300 void *dl;
301
302 dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
303
304 if (dl != NULL) {
305 return dlsym(dl, "nxt_app_module");
306 }
307
308 nxt_log(task, NXT_LOG_CRIT, "dlopen(\"%s\"), failed: \"%s\"",
309 name, dlerror());
310
311 return NULL;
312}
313
314
56nxt_int_t
57nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt)
58{
59 nxt_http_fields_hash_t *hash;
60
61 hash = nxt_http_fields_hash_create(nxt_app_request_fields, rt->mem_pool);
62 if (nxt_slow_path(hash == NULL)) {
63 return NXT_ERROR;
64 }
65
66 nxt_app_request_fields_hash = hash;
67
68 return NXT_OK;
69}
70
71
72void
73nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
74{
75 size_t dump_size;
76 nxt_buf_t *b;
77 nxt_port_t *port;
78 nxt_app_rmsg_t rmsg = { msg->buf };
79 nxt_app_wmsg_t wmsg;
80
81 b = msg->buf;
82 dump_size = b->mem.free - b->mem.pos;
83
84 if (dump_size > 300) {
85 dump_size = 300;
86 }
87
88 nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos);
89
90 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
91 msg->port_msg.reply_port);
92 if (nxt_slow_path(port == NULL)) {
93 //
94 }
95
96 wmsg.port = port;
97 wmsg.write = NULL;
98 wmsg.buf = &wmsg.write;
99 wmsg.stream = msg->port_msg.stream;
100
101 nxt_app->run(task, &rmsg, &wmsg);
102}
103
104
105nxt_inline nxt_port_t *
106nxt_app_msg_get_port(nxt_task_t *task, nxt_app_wmsg_t *msg)
107{
108 return msg->port;
109}
110
111
112u_char *
113nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
114{
115 size_t free_size;
116 u_char *res;
117 nxt_buf_t *b;
118 nxt_port_t *port;
119
120 res = NULL;
121
122 do {
123 b = *msg->buf;
124
125 if (b == NULL) {
126 port = nxt_app_msg_get_port(task, msg);
127 if (nxt_slow_path(port == NULL)) {
128 return NULL;
129 }
130
131 b = nxt_port_mmap_get_buf(task, port, size);
132 if (nxt_slow_path(b == NULL)) {
133 return NULL;
134 }
135
136 *msg->buf = b;
137
138 free_size = nxt_buf_mem_free_size(&b->mem);
139
140 if (nxt_slow_path(free_size < size)) {
141 nxt_debug(task, "requested buffer too big (%z < %z)",
142 free_size, size);
143 return NULL;
144 }
145
146 }
147
148 free_size = nxt_buf_mem_free_size(&b->mem);
149
150 if (free_size >= size) {
151 res = b->mem.free;
152 b->mem.free += size;
153
154 return res;
155 }
156
157 if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) {
158 res = b->mem.free;
159 b->mem.free += size;
160
161 return res;
162 }
163
164 msg->buf = &b->next;
165 } while(1);
166}
167
168
169nxt_int_t
170nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size)
171{
172 u_char *dst;
173 size_t dst_length;
174
175 if (c != NULL) {
176 dst_length = size + (size < 128 ? 1 : 4) + 1;
177
178 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
179 if (nxt_slow_path(dst == NULL)) {
180 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
181 dst_length);
182 return NXT_ERROR;
183 }
184
185 dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */
186
187 nxt_memcpy(dst, c, size);
188 dst[size] = 0;
189
190 nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, (int)size, c);
191 } else {
192 dst_length = 1;
193
194 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
195 if (nxt_slow_path(dst == NULL)) {
196 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
197 dst_length);
198 return NXT_ERROR;
199 }
200
201 dst = nxt_app_msg_write_length(dst, 0);
202
203 nxt_debug(task, "nxt_app_msg_write: NULL");
204 }
205
206 return NXT_OK;
207}
208
209
210nxt_int_t
211nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg,
212 const nxt_str_t *prefix, const nxt_str_t *v)
213{
214 u_char *dst, *src;
215 size_t i, length, dst_length;
216
217 length = prefix->length + v->length;
218
219 dst_length = length + (length < 128 ? 1 : 4) + 1;
220
221 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
222 if (nxt_slow_path(dst == NULL)) {
223 return NXT_ERROR;
224 }
225
226 dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */
227
228 nxt_memcpy(dst, prefix->start, prefix->length);
229 dst += prefix->length;
230
231 src = v->start;
232 for (i = 0; i < v->length; i++, dst++, src++) {
233
234 if (*src >= 'a' && *src <= 'z') {
235 *dst = *src & ~0x20;
236 continue;
237 }
238
239 if (*src == '-') {
240 *dst = '_';
241 continue;
242 }
243
244 *dst = *src;
245 }
246
247 *dst = 0;
248
249 return NXT_OK;
250}
251
252
253nxt_int_t
254nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
255{
256 size_t length;
257 nxt_buf_t *buf;
258
259 do {
260 buf = msg->buf;
261
262 if (nxt_slow_path(buf == NULL)) {
263 return NXT_DONE;
264 }
265
266 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
267 if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
268 msg->buf = buf->next;
269 continue;
270 }
271 return NXT_ERROR;
272 }
273
274 if (buf->mem.pos[0] >= 128) {
275 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
276 return NXT_ERROR;
277 }
278 }
279
280 break;
281 } while (1);
282
283 buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, &length);
284
285 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length))
286 {
287 return NXT_ERROR;
288 }
289
290 if (length > 0) {
291 str->start = buf->mem.pos;
292 str->length = length - 1;
293
294 buf->mem.pos += length;
295
296 nxt_debug(task, "nxt_read_str: %d %*s", (int)length - 1,
297 (int)length - 1, str->start);
298 } else {
299 str->start = NULL;
300 str->length = 0;
301
302 nxt_debug(task, "nxt_read_str: NULL");
303 }
304
305 return NXT_OK;
306}
307
308
309size_t
310nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst,
311 size_t size)
312{
313 size_t res, read_size;
314 nxt_buf_t *buf;
315
316 res = 0;
317
318 while (size > 0) {
319 buf = msg->buf;
320
321 if (nxt_slow_path(buf == NULL)) {
322 break;
323 }
324
325 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
326 msg->buf = buf->next;
327 continue;
328 }
329
330 read_size = nxt_buf_mem_used_size(&buf->mem);
331 read_size = nxt_min(read_size, size);
332
333 dst = nxt_cpymem(dst, buf->mem.pos, read_size);
334
335 size -= read_size;
336 buf->mem.pos += read_size;
337 res += read_size;
338 }
339
340 nxt_debug(task, "nxt_read_raw: %uz", res);
341
342 return res;
343}
344
345
346nxt_int_t
347nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
348 nxt_str_t *v)
349{
350 nxt_int_t rc;
351
352 rc = nxt_app_msg_read_str(task, rmsg, n);
353 if (nxt_slow_path(rc != NXT_OK)) {
354 return rc;
355 }
356
357 rc = nxt_app_msg_read_str(task, rmsg, v);
358 if (nxt_slow_path(rc != NXT_OK)) {
359 return rc;
360 }
361
362 return rc;
363}
364
365
366nxt_int_t
367nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
368{
369 nxt_buf_t *buf;
370
371 do {
372 buf = msg->buf;
373
374 if (nxt_slow_path(buf == NULL)) {
375 return NXT_DONE;
376 }
377
378 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
379 if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
380 msg->buf = buf->next;
381 continue;
382 }
383 return NXT_ERROR;
384 }
385
386 if (buf->mem.pos[0] >= 128) {
387 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
388 return NXT_ERROR;
389 }
390 }
391
392 break;
393 } while (1);
394
395 buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
396
397 nxt_debug(task, "nxt_read_size: %d", (int)*size);
398
399 return NXT_OK;
400}
401
402
403static nxt_int_t
404nxt_app_request_content_length(void *ctx, nxt_http_field_t *field,
405 nxt_log_t *log)
406{
407 nxt_str_t *v;
408 nxt_app_parse_ctx_t *c;
409 nxt_app_request_header_t *h;
410
411 c = ctx;
412 h = &c->r.header;
413 v = &field->value;
414
415 h->content_length = *v;
416 h->parsed_content_length = nxt_off_t_parse(v->start, v->length);
417
418 return NXT_OK;
419}
420
421
422static nxt_int_t
423nxt_app_request_content_type(void *ctx, nxt_http_field_t *field,
424 nxt_log_t *log)
425{
426 nxt_app_parse_ctx_t *c;
427 nxt_app_request_header_t *h;
428
429 c = ctx;
430 h = &c->r.header;
431
432 h->content_type = field->value;
433
434 return NXT_OK;
435}
436
437
438static nxt_int_t
439nxt_app_request_cookie(void *ctx, nxt_http_field_t *field,
440 nxt_log_t *log)
441{
442 nxt_app_parse_ctx_t *c;
443 nxt_app_request_header_t *h;
444
445 c = ctx;
446 h = &c->r.header;
447
448 h->cookie = field->value;
449
450 return NXT_OK;
451}
452
453
454static nxt_int_t
455nxt_app_request_host(void *ctx, nxt_http_field_t *field,
456 nxt_log_t *log)
457{
458 nxt_app_parse_ctx_t *c;
459 nxt_app_request_header_t *h;
460
461 c = ctx;
462 h = &c->r.header;
463
464 h->host = field->value;
465
466 return NXT_OK;
467}
468
469
470static nxt_http_fields_hash_entry_t nxt_app_request_fields[] = {
471 { nxt_string("Content-Length"), &nxt_app_request_content_length, 0 },
472 { nxt_string("Content-Type"), &nxt_app_request_content_type, 0 },
473 { nxt_string("Cookie"), &nxt_app_request_cookie, 0 },
474 { nxt_string("Host"), &nxt_app_request_host, 0 },
475
476 { nxt_null_string, NULL, 0 }
477};
478
479
480nxt_int_t
481nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx)
482{
483 nxt_int_t rc;
484
485 ctx->mem_pool = nxt_mp_create(1024, 128, 256, 32);
486
487 rc = nxt_http_parse_request_init(&ctx->parser, ctx->mem_pool);
488 if (nxt_slow_path(rc != NXT_OK)) {
489 return rc;
490 }
491
492 ctx->parser.fields_hash = nxt_app_request_fields_hash;
493
494 return NXT_OK;
495}
496
497
498nxt_int_t
499nxt_app_http_req_header_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
500 nxt_buf_t *buf)
501{
502 nxt_int_t rc;
503 nxt_app_request_body_t *b;
504 nxt_http_request_parse_t *p;
505 nxt_app_request_header_t *h;
506
507 p = &ctx->parser;
508 b = &ctx->r.body;
509 h = &ctx->r.header;
510
511 nxt_assert(h->done == 0);
512
513 rc = nxt_http_parse_request(p, &buf->mem);
514
515 if (nxt_slow_path(rc != NXT_DONE)) {
516 return rc;
517 }
518
519 rc = nxt_http_fields_process(p->fields, ctx, task->log);
520
521 if (nxt_slow_path(rc != NXT_OK)) {
522 return rc;
523 }
524
525 h->fields = p->fields;
526 h->done = 1;
527
528 h->version.start = p->version.str;
529 h->version.length = nxt_strlen(p->version.str);
530
531 h->method = p->method;
532
533 h->target.start = p->target_start;
534 h->target.length = p->target_end - p->target_start;
535
536 h->path = p->path;
537 h->query = p->args;
538
539 if (h->parsed_content_length == 0) {
540 b->done = 1;
541
542 }
543
544 if (buf->mem.free == buf->mem.pos) {
545 return NXT_DONE;
546 }
547
548 b->buf = buf;
549 b->done = nxt_buf_mem_used_size(&buf->mem) >=
550 h->parsed_content_length;
551
552 if (b->done == 1) {
553 b->preread_size = nxt_buf_mem_used_size(&buf->mem);
554 }
555
556 return NXT_DONE;
557}
558
559
560nxt_int_t
561nxt_app_http_req_body_read(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
562 nxt_buf_t *buf)
563{
564 nxt_app_request_body_t *b;
565 nxt_app_request_header_t *h;
566
567 b = &ctx->r.body;
568 h = &ctx->r.header;
569
570 nxt_assert(h->done == 1);
571 nxt_assert(b->done == 0);
572
573 b->done = nxt_buf_mem_used_size(&buf->mem) + b->preread_size >=
574 (size_t) h->parsed_content_length;
575
576 if (b->done == 1) {
577 b->preread_size += nxt_buf_mem_used_size(&buf->mem);
578 }
579
580 return b->done == 1 ? NXT_DONE : NXT_AGAIN;
581}
582
583
584nxt_int_t
585nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx)
586{
587 nxt_mp_destroy(ctx->mem_pool);
588
589 return NXT_OK;
590}
591
592
593nxt_int_t
594nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last)
595{
596 nxt_int_t rc;
597 nxt_buf_t *b;
598 nxt_port_t *port;
599
600 rc = NXT_OK;
601
602 port = nxt_app_msg_get_port(task, msg);
603 if (nxt_slow_path(port == NULL)) {
604 return NXT_ERROR;
605 }
606
607 if (nxt_slow_path(last == 1)) {
608 do {
609 b = *msg->buf;
610
611 if (b == NULL) {
612 b = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
613 *msg->buf = b;
614 break;
615 }
616
617 msg->buf = &b->next;
618 } while(1);
619 }
620
621 if (nxt_slow_path(msg->write != NULL)) {
622 rc = nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA,
623 -1, msg->stream, 0, msg->write);
624
625 msg->write = NULL;
626 msg->buf = &msg->write;
627 }
628
629 return rc;
630}
631
632
633nxt_int_t
634nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
635 size_t size)
636{
637 size_t free_size, copy_size;
638 nxt_buf_t *b;
639 nxt_port_t *port;
640
641 nxt_debug(task, "nxt_app_msg_write_raw: %uz", size);
642
643 while (size > 0) {
644 b = *msg->buf;
645
646 if (b == NULL) {
647 port = nxt_app_msg_get_port(task, msg);
648 if (nxt_slow_path(port == NULL)) {
649 return NXT_ERROR;
650 }
651
652 b = nxt_port_mmap_get_buf(task, port, size);
653 if (nxt_slow_path(b == NULL)) {
654 return NXT_ERROR;
655 }
656
657 *msg->buf = b;
658 }
659
660 do {
661 free_size = nxt_buf_mem_free_size(&b->mem);
662
663 if (free_size > 0) {
664 copy_size = nxt_min(free_size, size);
665
666 b->mem.free = nxt_cpymem(b->mem.free, c, copy_size);
667
668 size -= copy_size;
669 c += copy_size;
670
671 if (size == 0) {
672 return NXT_OK;
673 }
674 }
675 } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK);
676
677 msg->buf = &b->next;
678 }
679
680 return NXT_OK;
681}
682
683
315nxt_int_t
316nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt)
317{
318 nxt_http_fields_hash_t *hash;
319
320 hash = nxt_http_fields_hash_create(nxt_app_request_fields, rt->mem_pool);
321 if (nxt_slow_path(hash == NULL)) {
322 return NXT_ERROR;
323 }
324
325 nxt_app_request_fields_hash = hash;
326
327 return NXT_OK;
328}
329
330
331void
332nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
333{
334 size_t dump_size;
335 nxt_buf_t *b;
336 nxt_port_t *port;
337 nxt_app_rmsg_t rmsg = { msg->buf };
338 nxt_app_wmsg_t wmsg;
339
340 b = msg->buf;
341 dump_size = b->mem.free - b->mem.pos;
342
343 if (dump_size > 300) {
344 dump_size = 300;
345 }
346
347 nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos);
348
349 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
350 msg->port_msg.reply_port);
351 if (nxt_slow_path(port == NULL)) {
352 //
353 }
354
355 wmsg.port = port;
356 wmsg.write = NULL;
357 wmsg.buf = &wmsg.write;
358 wmsg.stream = msg->port_msg.stream;
359
360 nxt_app->run(task, &rmsg, &wmsg);
361}
362
363
364nxt_inline nxt_port_t *
365nxt_app_msg_get_port(nxt_task_t *task, nxt_app_wmsg_t *msg)
366{
367 return msg->port;
368}
369
370
371u_char *
372nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
373{
374 size_t free_size;
375 u_char *res;
376 nxt_buf_t *b;
377 nxt_port_t *port;
378
379 res = NULL;
380
381 do {
382 b = *msg->buf;
383
384 if (b == NULL) {
385 port = nxt_app_msg_get_port(task, msg);
386 if (nxt_slow_path(port == NULL)) {
387 return NULL;
388 }
389
390 b = nxt_port_mmap_get_buf(task, port, size);
391 if (nxt_slow_path(b == NULL)) {
392 return NULL;
393 }
394
395 *msg->buf = b;
396
397 free_size = nxt_buf_mem_free_size(&b->mem);
398
399 if (nxt_slow_path(free_size < size)) {
400 nxt_debug(task, "requested buffer too big (%z < %z)",
401 free_size, size);
402 return NULL;
403 }
404
405 }
406
407 free_size = nxt_buf_mem_free_size(&b->mem);
408
409 if (free_size >= size) {
410 res = b->mem.free;
411 b->mem.free += size;
412
413 return res;
414 }
415
416 if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) {
417 res = b->mem.free;
418 b->mem.free += size;
419
420 return res;
421 }
422
423 msg->buf = &b->next;
424 } while(1);
425}
426
427
428nxt_int_t
429nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size)
430{
431 u_char *dst;
432 size_t dst_length;
433
434 if (c != NULL) {
435 dst_length = size + (size < 128 ? 1 : 4) + 1;
436
437 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
438 if (nxt_slow_path(dst == NULL)) {
439 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
440 dst_length);
441 return NXT_ERROR;
442 }
443
444 dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */
445
446 nxt_memcpy(dst, c, size);
447 dst[size] = 0;
448
449 nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, (int)size, c);
450 } else {
451 dst_length = 1;
452
453 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
454 if (nxt_slow_path(dst == NULL)) {
455 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
456 dst_length);
457 return NXT_ERROR;
458 }
459
460 dst = nxt_app_msg_write_length(dst, 0);
461
462 nxt_debug(task, "nxt_app_msg_write: NULL");
463 }
464
465 return NXT_OK;
466}
467
468
469nxt_int_t
470nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg,
471 const nxt_str_t *prefix, const nxt_str_t *v)
472{
473 u_char *dst, *src;
474 size_t i, length, dst_length;
475
476 length = prefix->length + v->length;
477
478 dst_length = length + (length < 128 ? 1 : 4) + 1;
479
480 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
481 if (nxt_slow_path(dst == NULL)) {
482 return NXT_ERROR;
483 }
484
485 dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */
486
487 nxt_memcpy(dst, prefix->start, prefix->length);
488 dst += prefix->length;
489
490 src = v->start;
491 for (i = 0; i < v->length; i++, dst++, src++) {
492
493 if (*src >= 'a' && *src <= 'z') {
494 *dst = *src & ~0x20;
495 continue;
496 }
497
498 if (*src == '-') {
499 *dst = '_';
500 continue;
501 }
502
503 *dst = *src;
504 }
505
506 *dst = 0;
507
508 return NXT_OK;
509}
510
511
512nxt_int_t
513nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
514{
515 size_t length;
516 nxt_buf_t *buf;
517
518 do {
519 buf = msg->buf;
520
521 if (nxt_slow_path(buf == NULL)) {
522 return NXT_DONE;
523 }
524
525 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
526 if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
527 msg->buf = buf->next;
528 continue;
529 }
530 return NXT_ERROR;
531 }
532
533 if (buf->mem.pos[0] >= 128) {
534 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
535 return NXT_ERROR;
536 }
537 }
538
539 break;
540 } while (1);
541
542 buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, &length);
543
544 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length))
545 {
546 return NXT_ERROR;
547 }
548
549 if (length > 0) {
550 str->start = buf->mem.pos;
551 str->length = length - 1;
552
553 buf->mem.pos += length;
554
555 nxt_debug(task, "nxt_read_str: %d %*s", (int)length - 1,
556 (int)length - 1, str->start);
557 } else {
558 str->start = NULL;
559 str->length = 0;
560
561 nxt_debug(task, "nxt_read_str: NULL");
562 }
563
564 return NXT_OK;
565}
566
567
568size_t
569nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst,
570 size_t size)
571{
572 size_t res, read_size;
573 nxt_buf_t *buf;
574
575 res = 0;
576
577 while (size > 0) {
578 buf = msg->buf;
579
580 if (nxt_slow_path(buf == NULL)) {
581 break;
582 }
583
584 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
585 msg->buf = buf->next;
586 continue;
587 }
588
589 read_size = nxt_buf_mem_used_size(&buf->mem);
590 read_size = nxt_min(read_size, size);
591
592 dst = nxt_cpymem(dst, buf->mem.pos, read_size);
593
594 size -= read_size;
595 buf->mem.pos += read_size;
596 res += read_size;
597 }
598
599 nxt_debug(task, "nxt_read_raw: %uz", res);
600
601 return res;
602}
603
604
605nxt_int_t
606nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
607 nxt_str_t *v)
608{
609 nxt_int_t rc;
610
611 rc = nxt_app_msg_read_str(task, rmsg, n);
612 if (nxt_slow_path(rc != NXT_OK)) {
613 return rc;
614 }
615
616 rc = nxt_app_msg_read_str(task, rmsg, v);
617 if (nxt_slow_path(rc != NXT_OK)) {
618 return rc;
619 }
620
621 return rc;
622}
623
624
625nxt_int_t
626nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
627{
628 nxt_buf_t *buf;
629
630 do {
631 buf = msg->buf;
632
633 if (nxt_slow_path(buf == NULL)) {
634 return NXT_DONE;
635 }
636
637 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
638 if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
639 msg->buf = buf->next;
640 continue;
641 }
642 return NXT_ERROR;
643 }
644
645 if (buf->mem.pos[0] >= 128) {
646 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
647 return NXT_ERROR;
648 }
649 }
650
651 break;
652 } while (1);
653
654 buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
655
656 nxt_debug(task, "nxt_read_size: %d", (int)*size);
657
658 return NXT_OK;
659}
660
661
662static nxt_int_t
663nxt_app_request_content_length(void *ctx, nxt_http_field_t *field,
664 nxt_log_t *log)
665{
666 nxt_str_t *v;
667 nxt_app_parse_ctx_t *c;
668 nxt_app_request_header_t *h;
669
670 c = ctx;
671 h = &c->r.header;
672 v = &field->value;
673
674 h->content_length = *v;
675 h->parsed_content_length = nxt_off_t_parse(v->start, v->length);
676
677 return NXT_OK;
678}
679
680
681static nxt_int_t
682nxt_app_request_content_type(void *ctx, nxt_http_field_t *field,
683 nxt_log_t *log)
684{
685 nxt_app_parse_ctx_t *c;
686 nxt_app_request_header_t *h;
687
688 c = ctx;
689 h = &c->r.header;
690
691 h->content_type = field->value;
692
693 return NXT_OK;
694}
695
696
697static nxt_int_t
698nxt_app_request_cookie(void *ctx, nxt_http_field_t *field,
699 nxt_log_t *log)
700{
701 nxt_app_parse_ctx_t *c;
702 nxt_app_request_header_t *h;
703
704 c = ctx;
705 h = &c->r.header;
706
707 h->cookie = field->value;
708
709 return NXT_OK;
710}
711
712
713static nxt_int_t
714nxt_app_request_host(void *ctx, nxt_http_field_t *field,
715 nxt_log_t *log)
716{
717 nxt_app_parse_ctx_t *c;
718 nxt_app_request_header_t *h;
719
720 c = ctx;
721 h = &c->r.header;
722
723 h->host = field->value;
724
725 return NXT_OK;
726}
727
728
729static nxt_http_fields_hash_entry_t nxt_app_request_fields[] = {
730 { nxt_string("Content-Length"), &nxt_app_request_content_length, 0 },
731 { nxt_string("Content-Type"), &nxt_app_request_content_type, 0 },
732 { nxt_string("Cookie"), &nxt_app_request_cookie, 0 },
733 { nxt_string("Host"), &nxt_app_request_host, 0 },
734
735 { nxt_null_string, NULL, 0 }
736};
737
738
739nxt_int_t
740nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx)
741{
742 nxt_int_t rc;
743
744 ctx->mem_pool = nxt_mp_create(1024, 128, 256, 32);
745
746 rc = nxt_http_parse_request_init(&ctx->parser, ctx->mem_pool);
747 if (nxt_slow_path(rc != NXT_OK)) {
748 return rc;
749 }
750
751 ctx->parser.fields_hash = nxt_app_request_fields_hash;
752
753 return NXT_OK;
754}
755
756
757nxt_int_t
758nxt_app_http_req_header_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
759 nxt_buf_t *buf)
760{
761 nxt_int_t rc;
762 nxt_app_request_body_t *b;
763 nxt_http_request_parse_t *p;
764 nxt_app_request_header_t *h;
765
766 p = &ctx->parser;
767 b = &ctx->r.body;
768 h = &ctx->r.header;
769
770 nxt_assert(h->done == 0);
771
772 rc = nxt_http_parse_request(p, &buf->mem);
773
774 if (nxt_slow_path(rc != NXT_DONE)) {
775 return rc;
776 }
777
778 rc = nxt_http_fields_process(p->fields, ctx, task->log);
779
780 if (nxt_slow_path(rc != NXT_OK)) {
781 return rc;
782 }
783
784 h->fields = p->fields;
785 h->done = 1;
786
787 h->version.start = p->version.str;
788 h->version.length = nxt_strlen(p->version.str);
789
790 h->method = p->method;
791
792 h->target.start = p->target_start;
793 h->target.length = p->target_end - p->target_start;
794
795 h->path = p->path;
796 h->query = p->args;
797
798 if (h->parsed_content_length == 0) {
799 b->done = 1;
800
801 }
802
803 if (buf->mem.free == buf->mem.pos) {
804 return NXT_DONE;
805 }
806
807 b->buf = buf;
808 b->done = nxt_buf_mem_used_size(&buf->mem) >=
809 h->parsed_content_length;
810
811 if (b->done == 1) {
812 b->preread_size = nxt_buf_mem_used_size(&buf->mem);
813 }
814
815 return NXT_DONE;
816}
817
818
819nxt_int_t
820nxt_app_http_req_body_read(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
821 nxt_buf_t *buf)
822{
823 nxt_app_request_body_t *b;
824 nxt_app_request_header_t *h;
825
826 b = &ctx->r.body;
827 h = &ctx->r.header;
828
829 nxt_assert(h->done == 1);
830 nxt_assert(b->done == 0);
831
832 b->done = nxt_buf_mem_used_size(&buf->mem) + b->preread_size >=
833 (size_t) h->parsed_content_length;
834
835 if (b->done == 1) {
836 b->preread_size += nxt_buf_mem_used_size(&buf->mem);
837 }
838
839 return b->done == 1 ? NXT_DONE : NXT_AGAIN;
840}
841
842
843nxt_int_t
844nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx)
845{
846 nxt_mp_destroy(ctx->mem_pool);
847
848 return NXT_OK;
849}
850
851
852nxt_int_t
853nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last)
854{
855 nxt_int_t rc;
856 nxt_buf_t *b;
857 nxt_port_t *port;
858
859 rc = NXT_OK;
860
861 port = nxt_app_msg_get_port(task, msg);
862 if (nxt_slow_path(port == NULL)) {
863 return NXT_ERROR;
864 }
865
866 if (nxt_slow_path(last == 1)) {
867 do {
868 b = *msg->buf;
869
870 if (b == NULL) {
871 b = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
872 *msg->buf = b;
873 break;
874 }
875
876 msg->buf = &b->next;
877 } while(1);
878 }
879
880 if (nxt_slow_path(msg->write != NULL)) {
881 rc = nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA,
882 -1, msg->stream, 0, msg->write);
883
884 msg->write = NULL;
885 msg->buf = &msg->write;
886 }
887
888 return rc;
889}
890
891
892nxt_int_t
893nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
894 size_t size)
895{
896 size_t free_size, copy_size;
897 nxt_buf_t *b;
898 nxt_port_t *port;
899
900 nxt_debug(task, "nxt_app_msg_write_raw: %uz", size);
901
902 while (size > 0) {
903 b = *msg->buf;
904
905 if (b == NULL) {
906 port = nxt_app_msg_get_port(task, msg);
907 if (nxt_slow_path(port == NULL)) {
908 return NXT_ERROR;
909 }
910
911 b = nxt_port_mmap_get_buf(task, port, size);
912 if (nxt_slow_path(b == NULL)) {
913 return NXT_ERROR;
914 }
915
916 *msg->buf = b;
917 }
918
919 do {
920 free_size = nxt_buf_mem_free_size(&b->mem);
921
922 if (free_size > 0) {
923 copy_size = nxt_min(free_size, size);
924
925 b->mem.free = nxt_cpymem(b->mem.free, c, copy_size);
926
927 size -= copy_size;
928 c += copy_size;
929
930 if (size == 0) {
931 return NXT_OK;
932 }
933 }
934 } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK);
935
936 msg->buf = &b->next;
937 }
938
939 return NXT_OK;
940}
941
942
943nxt_app_lang_module_t *
944nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
945{
946 u_char *p, *end, *version;
947 size_t type_length, version_length;
948 nxt_uint_t i, n;
949 nxt_app_lang_module_t *lang;
950
951 end = name->start + name->length;
952 version = end;
953
954 for (p = name->start; p < end; p++) {
955 if (*p == ' ') {
956 version = p + 1;
957 break;
958 }
959
960 if (*p >= '0' && *p <= '9') {
961 version = p;
962 break;
963 }
964 }
965
966 type_length = p - name->start;
967 version_length = end - version;
968
969 lang = rt->languages->elts;
970 n = rt->languages->nelts;
971
972 for (i = 0; i < n; i++) {
973 if (nxt_str_eq(&lang[i].type, name->start, type_length)
974 && nxt_str_start(&lang[i].version, version, version_length))
975 {
976 return &lang[i];
977 }
978 }
979
980 return NULL;
981}
982
983
684nxt_app_type_t
685nxt_app_parse_type(nxt_str_t *str)
686{
687 if (nxt_str_eq(str, "python", 6)) {
688 return NXT_APP_PYTHON;
689
984nxt_app_type_t
985nxt_app_parse_type(nxt_str_t *str)
986{
987 if (nxt_str_eq(str, "python", 6)) {
988 return NXT_APP_PYTHON;
989
690 } else if (nxt_str_eq(str, "python2", 7)) {
691 return NXT_APP_PYTHON2;
692
693 } else if (nxt_str_eq(str, "python3", 7)) {
694 return NXT_APP_PYTHON3;
695
696 } else if (nxt_str_eq(str, "php", 3)) {
697 return NXT_APP_PHP;
698
990 } else if (nxt_str_eq(str, "php", 3)) {
991 return NXT_APP_PHP;
992
699 } else if (nxt_str_eq(str, "php5", 4)) {
700 return NXT_APP_PHP5;
701
702 } else if (nxt_str_eq(str, "php7", 4)) {
703 return NXT_APP_PHP7;
704
705 } else if (nxt_str_eq(str, "ruby", 4)) {
706 return NXT_APP_RUBY;
707
708 } else if (nxt_str_eq(str, "go", 2)) {
709 return NXT_APP_GO;
710
711 }
712
713 return NXT_APP_UNKNOWN;
714}
993 } else if (nxt_str_eq(str, "go", 2)) {
994 return NXT_APP_GO;
995
996 }
997
998 return NXT_APP_UNKNOWN;
999}