nxt_application.c (678:f05d36cbe461) nxt_application.c (703:2d536dde84d2)
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) Igor Sysoev
5 * Copyright (C) Valentin V. Bartenev
6 * Copyright (C) NGINX, Inc.
7 */
8
9#include <nxt_main.h>
10#include <nxt_runtime.h>
11#include <nxt_main_process.h>
12#include <nxt_router.h>
13#include <nxt_http.h>
14#include <nxt_application.h>
15
16#include <glob.h>
17
18
19typedef struct {
20 nxt_app_type_t type;
21 nxt_str_t version;
22 nxt_str_t file;
23} nxt_module_t;
24
25
26static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
27static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
28 nxt_array_t *modules, const char *name);
29static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj,
30 void *data);
31static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
32 void *data);
33static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
34 const char *name);
35static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
36
37static void nxt_app_http_release(nxt_task_t *task, void *obj, void *data);
38
39
40static uint32_t compat[] = {
41 NXT_VERNUM, NXT_DEBUG,
42};
43
44
45nxt_str_t nxt_server = nxt_string(NXT_SERVER);
46
47
48static nxt_thread_mutex_t nxt_app_mutex;
49static nxt_thread_cond_t nxt_app_cond;
50
51static nxt_application_module_t *nxt_app;
52
53
54nxt_int_t
55nxt_discovery_start(nxt_task_t *task, void *data)
56{
57 uint32_t stream;
58 nxt_buf_t *b;
59 nxt_int_t ret;
60 nxt_port_t *main_port, *discovery_port;
61 nxt_runtime_t *rt;
62
63 nxt_debug(task, "DISCOVERY");
64
65 rt = task->thread->runtime;
66
67 b = nxt_discovery_modules(task, rt->modules);
68 if (nxt_slow_path(b == NULL)) {
69 return NXT_ERROR;
70 }
71
72 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
73 discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY];
74
75 stream = nxt_port_rpc_register_handler(task, discovery_port,
76 nxt_discovery_quit,
77 nxt_discovery_quit,
78 main_port->pid, NULL);
79
80 if (nxt_slow_path(stream == 0)) {
81 return NXT_ERROR;
82 }
83
84 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
85 stream, discovery_port->id, b);
86
87 if (nxt_slow_path(ret != NXT_OK)) {
88 nxt_port_rpc_cancel(task, discovery_port, stream);
89 return NXT_ERROR;
90 }
91
92 return NXT_OK;
93}
94
95
96static nxt_buf_t *
97nxt_discovery_modules(nxt_task_t *task, const char *path)
98{
99 char *name;
100 u_char *p, *end;
101 size_t size;
102 glob_t glb;
103 nxt_mp_t *mp;
104 nxt_buf_t *b;
105 nxt_int_t ret;
106 nxt_uint_t i, n;
107 nxt_array_t *modules;
108 nxt_module_t *module;
109
110 b = NULL;
111
112 mp = nxt_mp_create(1024, 128, 256, 32);
113 if (mp == NULL) {
114 return b;
115 }
116
117 ret = glob(path, 0, NULL, &glb);
118
119 n = glb.gl_pathc;
120
121 if (ret != 0) {
122 nxt_log(task, NXT_LOG_NOTICE,
123 "no modules matching: \"%s\" found", path);
124 n = 0;
125 }
126
127 modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
128 if (modules == NULL) {
129 goto fail;
130 }
131
132 for (i = 0; i < n; i++) {
133 name = glb.gl_pathv[i];
134
135 ret = nxt_discovery_module(task, mp, modules, name);
136 if (ret != NXT_OK) {
137 goto fail;
138 }
139 }
140
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) Igor Sysoev
5 * Copyright (C) Valentin V. Bartenev
6 * Copyright (C) NGINX, Inc.
7 */
8
9#include <nxt_main.h>
10#include <nxt_runtime.h>
11#include <nxt_main_process.h>
12#include <nxt_router.h>
13#include <nxt_http.h>
14#include <nxt_application.h>
15
16#include <glob.h>
17
18
19typedef struct {
20 nxt_app_type_t type;
21 nxt_str_t version;
22 nxt_str_t file;
23} nxt_module_t;
24
25
26static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
27static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
28 nxt_array_t *modules, const char *name);
29static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj,
30 void *data);
31static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
32 void *data);
33static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
34 const char *name);
35static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
36
37static void nxt_app_http_release(nxt_task_t *task, void *obj, void *data);
38
39
40static uint32_t compat[] = {
41 NXT_VERNUM, NXT_DEBUG,
42};
43
44
45nxt_str_t nxt_server = nxt_string(NXT_SERVER);
46
47
48static nxt_thread_mutex_t nxt_app_mutex;
49static nxt_thread_cond_t nxt_app_cond;
50
51static nxt_application_module_t *nxt_app;
52
53
54nxt_int_t
55nxt_discovery_start(nxt_task_t *task, void *data)
56{
57 uint32_t stream;
58 nxt_buf_t *b;
59 nxt_int_t ret;
60 nxt_port_t *main_port, *discovery_port;
61 nxt_runtime_t *rt;
62
63 nxt_debug(task, "DISCOVERY");
64
65 rt = task->thread->runtime;
66
67 b = nxt_discovery_modules(task, rt->modules);
68 if (nxt_slow_path(b == NULL)) {
69 return NXT_ERROR;
70 }
71
72 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
73 discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY];
74
75 stream = nxt_port_rpc_register_handler(task, discovery_port,
76 nxt_discovery_quit,
77 nxt_discovery_quit,
78 main_port->pid, NULL);
79
80 if (nxt_slow_path(stream == 0)) {
81 return NXT_ERROR;
82 }
83
84 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
85 stream, discovery_port->id, b);
86
87 if (nxt_slow_path(ret != NXT_OK)) {
88 nxt_port_rpc_cancel(task, discovery_port, stream);
89 return NXT_ERROR;
90 }
91
92 return NXT_OK;
93}
94
95
96static nxt_buf_t *
97nxt_discovery_modules(nxt_task_t *task, const char *path)
98{
99 char *name;
100 u_char *p, *end;
101 size_t size;
102 glob_t glb;
103 nxt_mp_t *mp;
104 nxt_buf_t *b;
105 nxt_int_t ret;
106 nxt_uint_t i, n;
107 nxt_array_t *modules;
108 nxt_module_t *module;
109
110 b = NULL;
111
112 mp = nxt_mp_create(1024, 128, 256, 32);
113 if (mp == NULL) {
114 return b;
115 }
116
117 ret = glob(path, 0, NULL, &glb);
118
119 n = glb.gl_pathc;
120
121 if (ret != 0) {
122 nxt_log(task, NXT_LOG_NOTICE,
123 "no modules matching: \"%s\" found", path);
124 n = 0;
125 }
126
127 modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
128 if (modules == NULL) {
129 goto fail;
130 }
131
132 for (i = 0; i < n; i++) {
133 name = glb.gl_pathv[i];
134
135 ret = nxt_discovery_module(task, mp, modules, name);
136 if (ret != NXT_OK) {
137 goto fail;
138 }
139 }
140
141 size = sizeof("[]") - 1;
141 size = nxt_length("[]");
142 module = modules->elts;
143 n = modules->nelts;
144
145 for (i = 0; i < n; i++) {
146 nxt_debug(task, "module: %d %V %V",
147 module[i].type, &module[i].version, &module[i].file);
148
142 module = modules->elts;
143 n = modules->nelts;
144
145 for (i = 0; i < n; i++) {
146 nxt_debug(task, "module: %d %V %V",
147 module[i].type, &module[i].version, &module[i].file);
148
149 size += sizeof("{\"type\": ,") - 1;
150 size += sizeof(" \"version\": \"\",") - 1;
151 size += sizeof(" \"file\": \"\"},") - 1;
149 size += nxt_length("{\"type\": ,");
150 size += nxt_length(" \"version\": \"\",");
151 size += nxt_length(" \"file\": \"\"},");
152
153 size += NXT_INT_T_LEN
154 + module[i].version.length
155 + module[i].file.length;
156 }
157
158 b = nxt_buf_mem_alloc(mp, size, 0);
159 if (b == NULL) {
160 goto fail;
161 }
162
163 b->completion_handler = nxt_discovery_completion_handler;
164
165 p = b->mem.free;
166 end = b->mem.end;
167 *p++ = '[';
168
169 for (i = 0; i < n; i++) {
170 p = nxt_sprintf(p, end,
171 "{\"type\": %d, \"version\": \"%V\", \"file\": \"%V\"},",
172 module[i].type, &module[i].version, &module[i].file);
173 }
174
175 *p++ = ']';
176 b->mem.free = p;
177
178fail:
179
180 globfree(&glb);
181
182 return b;
183}
184
185
186static nxt_int_t
187nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
188 const char *name)
189{
190 void *dl;
191 nxt_str_t version;
192 nxt_int_t ret;
193 nxt_uint_t i, n;
194 nxt_module_t *module;
195 nxt_app_type_t type;
196 nxt_application_module_t *app;
197
198 /*
199 * Only memory allocation failure should return NXT_ERROR.
200 * Any module processing errors are ignored.
201 */
202 ret = NXT_ERROR;
203
204 dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
205
206 if (dl == NULL) {
207 nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
208 return NXT_OK;
209 }
210
211 app = dlsym(dl, "nxt_app_module");
212
213 if (app != NULL) {
214 nxt_log(task, NXT_LOG_NOTICE, "module: %V %s \"%s\"",
215 &app->type, app->version, name);
216
217 if (app->compat_length != sizeof(compat)
218 || nxt_memcmp(app->compat, compat, sizeof(compat)) != 0)
219 {
220 nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name);
221
222 goto done;
223 }
224
225 type = nxt_app_parse_type(app->type.start, app->type.length);
226
227 if (type == NXT_APP_UNKNOWN) {
228 nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type);
229
230 goto done;
231 }
232
233 module = modules->elts;
234 n = modules->nelts;
235
236 version.start = (u_char *) app->version;
237 version.length = nxt_strlen(app->version);
238
239 for (i = 0; i < n; i++) {
240 if (type == module[i].type
241 && nxt_strstr_eq(&module[i].version, &version))
242 {
243 nxt_log(task, NXT_LOG_NOTICE,
244 "ignoring %s module with the same "
245 "application language version %V %V as in %V",
246 name, &app->type, &version, &module[i].file);
247
248 goto done;
249 }
250 }
251
252 module = nxt_array_add(modules);
253 if (module == NULL) {
254 goto fail;
255 }
256
257 module->type = type;
258
259 nxt_str_dup(mp, &module->version, &version);
260 if (module->version.start == NULL) {
261 goto fail;
262 }
263
264 module->file.length = nxt_strlen(name);
265
266 module->file.start = nxt_mp_alloc(mp, module->file.length);
267 if (module->file.start == NULL) {
268 goto fail;
269 }
270
271 nxt_memcpy(module->file.start, name, module->file.length);
272
273 } else {
274 nxt_alert(task, "dlsym(\"%s\"), failed: \"%s\"", name, dlerror());
275 }
276
277done:
278
279 ret = NXT_OK;
280
281fail:
282
283 if (dlclose(dl) != 0) {
284 nxt_alert(task, "dlclose(\"%s\"), failed: \"%s\"", name, dlerror());
285 }
286
287 return ret;
288}
289
290
291static void
292nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
293{
294 nxt_mp_t *mp;
295 nxt_buf_t *b;
296
297 b = obj;
298 mp = b->data;
299
300 nxt_mp_destroy(mp);
301}
302
303
304static void
305nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
306{
307 nxt_worker_process_quit_handler(task, msg);
308}
309
310
311nxt_int_t
312nxt_app_start(nxt_task_t *task, void *data)
313{
314 nxt_int_t ret;
315 nxt_app_lang_module_t *lang;
316 nxt_common_app_conf_t *app_conf;
317
318 app_conf = data;
319
320 lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
321 if (nxt_slow_path(lang == NULL)) {
322 nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type);
323 return NXT_ERROR;
324 }
325
326 nxt_app = lang->module;
327
328 if (nxt_app == NULL) {
329 nxt_debug(task, "application language module: %s \"%s\"",
330 lang->version, lang->file);
331
332 nxt_app = nxt_app_module_load(task, lang->file);
333 }
334
335 if (app_conf->working_directory != NULL
336 && app_conf->working_directory[0] != 0)
337 {
338 ret = chdir(app_conf->working_directory);
339
340 if (nxt_slow_path(ret != 0)) {
341 nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E",
342 app_conf->working_directory, nxt_errno);
343
344 return NXT_ERROR;
345 }
346 }
347
348 if (nxt_slow_path(nxt_app_set_environment(app_conf->environment)
349 != NXT_OK))
350 {
351 nxt_alert(task, "failed to set environment");
352 return NXT_ERROR;
353 }
354
355 if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
356 return NXT_ERROR;
357 }
358
359 if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) {
360 return NXT_ERROR;
361 }
362
363 ret = nxt_app->init(task, data);
364
365 if (nxt_slow_path(ret != NXT_OK)) {
366 nxt_debug(task, "application init failed");
367
368 } else {
369 nxt_debug(task, "application init done");
370 }
371
372 return ret;
373}
374
375
376static nxt_app_module_t *
377nxt_app_module_load(nxt_task_t *task, const char *name)
378{
379 void *dl;
380
381 dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
382
383 if (dl != NULL) {
384 return dlsym(dl, "nxt_app_module");
385 }
386
387 nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
388
389 return NULL;
390}
391
392
393static nxt_int_t
394nxt_app_set_environment(nxt_conf_value_t *environment)
395{
396 char *env, *p;
397 uint32_t next;
398 nxt_str_t name, value;
399 nxt_conf_value_t *value_obj;
400
401 if (environment != NULL) {
402 next = 0;
403
404 for ( ;; ) {
405 value_obj = nxt_conf_next_object_member(environment, &name, &next);
406 if (value_obj == NULL) {
407 break;
408 }
409
410 nxt_conf_get_string(value_obj, &value);
411
412 env = nxt_malloc(name.length + value.length + 2);
413 if (nxt_slow_path(env == NULL)) {
414 return NXT_ERROR;
415 }
416
417 p = nxt_cpymem(env, name.start, name.length);
418 *p++ = '=';
419 p = nxt_cpymem(p, value.start, value.length);
420 *p = '\0';
421
422 if (nxt_slow_path(putenv(env) != 0)) {
423 return NXT_ERROR;
424 }
425 }
426 }
427
428 return NXT_OK;
429}
430
431
432void
433nxt_app_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
434{
435 if (nxt_app->atexit != NULL) {
436 nxt_app->atexit(task);
437 }
438
439 nxt_worker_process_quit_handler(task, msg);
440}
441
442
443void
444nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
445{
446 size_t dump_size;
447 nxt_int_t res;
448 nxt_buf_t *b;
449 nxt_port_t *port;
450 nxt_app_rmsg_t rmsg = { msg->buf };
451 nxt_app_wmsg_t wmsg;
452
453 b = msg->buf;
454 dump_size = b->mem.free - b->mem.pos;
455
456 if (dump_size > 300) {
457 dump_size = 300;
458 }
459
460 nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos);
461
462 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
463 msg->port_msg.reply_port);
464 if (nxt_slow_path(port == NULL)) {
465 nxt_debug(task, "stream #%uD: reply port %d not found",
466 msg->port_msg.stream, msg->port_msg.reply_port);
467 return;
468 }
469
470 wmsg.port = port;
471 wmsg.write = NULL;
472 wmsg.buf = &wmsg.write;
473 wmsg.stream = msg->port_msg.stream;
474
475 res = nxt_app->run(task, &rmsg, &wmsg);
476
477 if (nxt_slow_path(res != NXT_OK)) {
478 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
479 msg->port_msg.stream, 0, NULL);
480 }
481}
482
483
484u_char *
485nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
486{
487 size_t free_size;
488 u_char *res;
489 nxt_buf_t *b;
490
491 res = NULL;
492
493 do {
494 b = *msg->buf;
495
496 if (b == NULL) {
497 b = nxt_port_mmap_get_buf(task, msg->port, size);
498 if (nxt_slow_path(b == NULL)) {
499 return NULL;
500 }
501
502 *msg->buf = b;
503
504 free_size = nxt_buf_mem_free_size(&b->mem);
505
506 if (nxt_slow_path(free_size < size)) {
507 nxt_log(task, NXT_LOG_WARN, "requested buffer too big "
508 "(%z < %z)", free_size, size);
509 return NULL;
510 }
511
512 }
513
514 free_size = nxt_buf_mem_free_size(&b->mem);
515
516 if (free_size >= size) {
517 res = b->mem.free;
518 b->mem.free += size;
519
520 return res;
521 }
522
523 if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) {
524 res = b->mem.free;
525 b->mem.free += size;
526
527 return res;
528 }
529
530 msg->buf = &b->next;
531 } while(1);
532}
533
534
535nxt_int_t
536nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size)
537{
538 u_char *dst;
539 size_t dst_length;
540
541 if (c != NULL) {
542 dst_length = size + (size < 128 ? 1 : 4) + 1;
543
544 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
545 if (nxt_slow_path(dst == NULL)) {
546 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
547 dst_length);
548 return NXT_ERROR;
549 }
550
551 dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */
552
553 nxt_memcpy(dst, c, size);
554 dst[size] = 0;
555
556 nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, size, c);
557
558 } else {
559 dst_length = 1;
560
561 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
562 if (nxt_slow_path(dst == NULL)) {
563 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
564 dst_length);
565 return NXT_ERROR;
566 }
567
568 dst = nxt_app_msg_write_length(dst, 0);
569
570 nxt_debug(task, "nxt_app_msg_write: NULL");
571 }
572
573 return NXT_OK;
574}
575
576
577nxt_int_t
578nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg,
579 const nxt_str_t *prefix, u_char *c, size_t size)
580{
581 u_char *dst, *src;
582 size_t i, length, dst_length;
583
584 length = prefix->length + size;
585
586 dst_length = length + (length < 128 ? 1 : 4) + 1;
587
588 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
589 if (nxt_slow_path(dst == NULL)) {
590 return NXT_ERROR;
591 }
592
593 dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */
594
595 nxt_memcpy(dst, prefix->start, prefix->length);
596 dst += prefix->length;
597
598 src = c;
599 for (i = 0; i < size; i++, dst++, src++) {
600
601 if (*src >= 'a' && *src <= 'z') {
602 *dst = *src & ~0x20;
603 continue;
604 }
605
606 if (*src == '-') {
607 *dst = '_';
608 continue;
609 }
610
611 *dst = *src;
612 }
613
614 *dst = 0;
615
616 return NXT_OK;
617}
618
619
620nxt_inline nxt_int_t
621nxt_app_msg_read_size_(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
622{
623 nxt_buf_t *buf;
624
625 do {
626 buf = msg->buf;
627
628 if (nxt_slow_path(buf == NULL)) {
629 return NXT_DONE;
630 }
631
632 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
633 if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
634 msg->buf = buf->next;
635 continue;
636 }
637 return NXT_ERROR;
638 }
639
640 if (buf->mem.pos[0] >= 128) {
641 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
642 return NXT_ERROR;
643 }
644 }
645
646 break;
647 } while (1);
648
649 buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
650
651 return NXT_OK;
652}
653
654
655nxt_int_t
656nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
657{
658 size_t length;
659 nxt_int_t ret;
660 nxt_buf_t *buf;
661
662 ret = nxt_app_msg_read_size_(task, msg, &length);
663 if (ret != NXT_OK) {
664 return ret;
665 }
666
667 buf = msg->buf;
668
669 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) {
670 return NXT_ERROR;
671 }
672
673 if (length > 0) {
674 str->start = buf->mem.pos;
675 str->length = length - 1;
676
677 buf->mem.pos += length;
678
679 nxt_debug(task, "nxt_read_str: %uz %*s", length - 1,
680 length - 1, str->start);
681
682 } else {
683 str->start = NULL;
684 str->length = 0;
685
686 nxt_debug(task, "nxt_read_str: NULL");
687 }
688
689 return NXT_OK;
690}
691
692
693size_t
694nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst,
695 size_t size)
696{
697 size_t res, read_size;
698 nxt_buf_t *buf;
699
700 res = 0;
701
702 while (size > 0) {
703 buf = msg->buf;
704
705 if (nxt_slow_path(buf == NULL)) {
706 break;
707 }
708
709 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
710 msg->buf = buf->next;
711 continue;
712 }
713
714 read_size = nxt_buf_mem_used_size(&buf->mem);
715 read_size = nxt_min(read_size, size);
716
717 dst = nxt_cpymem(dst, buf->mem.pos, read_size);
718
719 size -= read_size;
720 buf->mem.pos += read_size;
721 res += read_size;
722 }
723
724 nxt_debug(task, "nxt_read_raw: %uz", res);
725
726 return res;
727}
728
729
730nxt_int_t
731nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
732 nxt_str_t *v)
733{
734 nxt_int_t rc;
735
736 rc = nxt_app_msg_read_str(task, rmsg, n);
737 if (nxt_slow_path(rc != NXT_OK)) {
738 return rc;
739 }
740
741 rc = nxt_app_msg_read_str(task, rmsg, v);
742 if (nxt_slow_path(rc != NXT_OK)) {
743 return rc;
744 }
745
746 return rc;
747}
748
749
750nxt_int_t
751nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
752{
753 nxt_int_t ret;
754
755 ret = nxt_app_msg_read_size_(task, msg, size);
756
757 nxt_debug(task, "nxt_read_size: %d", (int) *size);
758
759 return ret;
760}
761
762
763nxt_int_t
764nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ar)
765{
766 ar->timer.handler = nxt_app_http_release;
767 nxt_timer_add(task->thread->engine, &ar->timer, 0);
768
769 return NXT_OK;
770}
771
772
773static void
774nxt_app_http_release(nxt_task_t *task, void *obj, void *data)
775{
776 nxt_timer_t *timer;
777 nxt_app_parse_ctx_t *ar;
778
779 timer = obj;
780
781 nxt_debug(task, "http app release");
782
783 ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
784
785 nxt_mp_release(ar->request->mem_pool);
786}
787
788
789nxt_int_t
790nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last)
791{
792 nxt_int_t rc;
793 nxt_buf_t *b;
794
795 rc = NXT_OK;
796
797 if (nxt_slow_path(last == 1)) {
798 do {
799 b = *msg->buf;
800
801 if (b == NULL) {
802 b = nxt_buf_sync_alloc(msg->port->mem_pool, NXT_BUF_SYNC_LAST);
803 *msg->buf = b;
804 break;
805 }
806
807 msg->buf = &b->next;
808 } while(1);
809 }
810
811 if (nxt_slow_path(msg->write != NULL)) {
812 rc = nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_DATA,
813 -1, msg->stream, 0, msg->write);
814
815 msg->write = NULL;
816 msg->buf = &msg->write;
817 }
818
819 return rc;
820}
821
822
823nxt_int_t
824nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
825 size_t size)
826{
827 size_t free_size, copy_size;
828 nxt_buf_t *b;
829
830 nxt_debug(task, "nxt_app_msg_write_raw: %uz", size);
831
832 while (size > 0) {
833 b = *msg->buf;
834
835 if (b == NULL) {
836 b = nxt_port_mmap_get_buf(task, msg->port, size);
837 if (nxt_slow_path(b == NULL)) {
838 return NXT_ERROR;
839 }
840
841 *msg->buf = b;
842 }
843
844 do {
845 free_size = nxt_buf_mem_free_size(&b->mem);
846
847 if (free_size > 0) {
848 copy_size = nxt_min(free_size, size);
849
850 b->mem.free = nxt_cpymem(b->mem.free, c, copy_size);
851
852 size -= copy_size;
853 c += copy_size;
854
855 if (size == 0) {
856 return NXT_OK;
857 }
858 }
859 } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK);
860
861 msg->buf = &b->next;
862 }
863
864 return NXT_OK;
865}
866
867
868nxt_app_lang_module_t *
869nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
870{
871 u_char *p, *end, *version;
872 size_t version_length;
873 nxt_uint_t i, n;
874 nxt_app_type_t type;
875 nxt_app_lang_module_t *lang;
876
877 end = name->start + name->length;
878 version = end;
879
880 for (p = name->start; p < end; p++) {
881 if (*p == ' ') {
882 version = p + 1;
883 break;
884 }
885
886 if (*p >= '0' && *p <= '9') {
887 version = p;
888 break;
889 }
890 }
891
892 type = nxt_app_parse_type(name->start, p - name->start);
893
894 if (type == NXT_APP_UNKNOWN) {
895 return NULL;
896 }
897
898 version_length = end - version;
899
900 lang = rt->languages->elts;
901 n = rt->languages->nelts;
902
903 for (i = 0; i < n; i++) {
904
905 /*
906 * Versions are sorted in descending order
907 * so first match chooses the highest version.
908 */
909
910 if (lang[i].type == type
911 && nxt_strvers_match(lang[i].version, version, version_length))
912 {
913 return &lang[i];
914 }
915 }
916
917 return NULL;
918}
919
920
921nxt_app_type_t
922nxt_app_parse_type(u_char *p, size_t length)
923{
924 nxt_str_t str;
925
926 str.length = length;
927 str.start = p;
928
929 if (nxt_str_eq(&str, "python", 6)) {
930 return NXT_APP_PYTHON;
931
932 } else if (nxt_str_eq(&str, "php", 3)) {
933 return NXT_APP_PHP;
934
935 } else if (nxt_str_eq(&str, "go", 2)) {
936 return NXT_APP_GO;
937
938 } else if (nxt_str_eq(&str, "perl", 4)) {
939 return NXT_APP_PERL;
940
941 } else if (nxt_str_eq(&str, "ruby", 4)) {
942 return NXT_APP_RUBY;
943 }
944
945 return NXT_APP_UNKNOWN;
946}
152
153 size += NXT_INT_T_LEN
154 + module[i].version.length
155 + module[i].file.length;
156 }
157
158 b = nxt_buf_mem_alloc(mp, size, 0);
159 if (b == NULL) {
160 goto fail;
161 }
162
163 b->completion_handler = nxt_discovery_completion_handler;
164
165 p = b->mem.free;
166 end = b->mem.end;
167 *p++ = '[';
168
169 for (i = 0; i < n; i++) {
170 p = nxt_sprintf(p, end,
171 "{\"type\": %d, \"version\": \"%V\", \"file\": \"%V\"},",
172 module[i].type, &module[i].version, &module[i].file);
173 }
174
175 *p++ = ']';
176 b->mem.free = p;
177
178fail:
179
180 globfree(&glb);
181
182 return b;
183}
184
185
186static nxt_int_t
187nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
188 const char *name)
189{
190 void *dl;
191 nxt_str_t version;
192 nxt_int_t ret;
193 nxt_uint_t i, n;
194 nxt_module_t *module;
195 nxt_app_type_t type;
196 nxt_application_module_t *app;
197
198 /*
199 * Only memory allocation failure should return NXT_ERROR.
200 * Any module processing errors are ignored.
201 */
202 ret = NXT_ERROR;
203
204 dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
205
206 if (dl == NULL) {
207 nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
208 return NXT_OK;
209 }
210
211 app = dlsym(dl, "nxt_app_module");
212
213 if (app != NULL) {
214 nxt_log(task, NXT_LOG_NOTICE, "module: %V %s \"%s\"",
215 &app->type, app->version, name);
216
217 if (app->compat_length != sizeof(compat)
218 || nxt_memcmp(app->compat, compat, sizeof(compat)) != 0)
219 {
220 nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name);
221
222 goto done;
223 }
224
225 type = nxt_app_parse_type(app->type.start, app->type.length);
226
227 if (type == NXT_APP_UNKNOWN) {
228 nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type);
229
230 goto done;
231 }
232
233 module = modules->elts;
234 n = modules->nelts;
235
236 version.start = (u_char *) app->version;
237 version.length = nxt_strlen(app->version);
238
239 for (i = 0; i < n; i++) {
240 if (type == module[i].type
241 && nxt_strstr_eq(&module[i].version, &version))
242 {
243 nxt_log(task, NXT_LOG_NOTICE,
244 "ignoring %s module with the same "
245 "application language version %V %V as in %V",
246 name, &app->type, &version, &module[i].file);
247
248 goto done;
249 }
250 }
251
252 module = nxt_array_add(modules);
253 if (module == NULL) {
254 goto fail;
255 }
256
257 module->type = type;
258
259 nxt_str_dup(mp, &module->version, &version);
260 if (module->version.start == NULL) {
261 goto fail;
262 }
263
264 module->file.length = nxt_strlen(name);
265
266 module->file.start = nxt_mp_alloc(mp, module->file.length);
267 if (module->file.start == NULL) {
268 goto fail;
269 }
270
271 nxt_memcpy(module->file.start, name, module->file.length);
272
273 } else {
274 nxt_alert(task, "dlsym(\"%s\"), failed: \"%s\"", name, dlerror());
275 }
276
277done:
278
279 ret = NXT_OK;
280
281fail:
282
283 if (dlclose(dl) != 0) {
284 nxt_alert(task, "dlclose(\"%s\"), failed: \"%s\"", name, dlerror());
285 }
286
287 return ret;
288}
289
290
291static void
292nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
293{
294 nxt_mp_t *mp;
295 nxt_buf_t *b;
296
297 b = obj;
298 mp = b->data;
299
300 nxt_mp_destroy(mp);
301}
302
303
304static void
305nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
306{
307 nxt_worker_process_quit_handler(task, msg);
308}
309
310
311nxt_int_t
312nxt_app_start(nxt_task_t *task, void *data)
313{
314 nxt_int_t ret;
315 nxt_app_lang_module_t *lang;
316 nxt_common_app_conf_t *app_conf;
317
318 app_conf = data;
319
320 lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
321 if (nxt_slow_path(lang == NULL)) {
322 nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type);
323 return NXT_ERROR;
324 }
325
326 nxt_app = lang->module;
327
328 if (nxt_app == NULL) {
329 nxt_debug(task, "application language module: %s \"%s\"",
330 lang->version, lang->file);
331
332 nxt_app = nxt_app_module_load(task, lang->file);
333 }
334
335 if (app_conf->working_directory != NULL
336 && app_conf->working_directory[0] != 0)
337 {
338 ret = chdir(app_conf->working_directory);
339
340 if (nxt_slow_path(ret != 0)) {
341 nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E",
342 app_conf->working_directory, nxt_errno);
343
344 return NXT_ERROR;
345 }
346 }
347
348 if (nxt_slow_path(nxt_app_set_environment(app_conf->environment)
349 != NXT_OK))
350 {
351 nxt_alert(task, "failed to set environment");
352 return NXT_ERROR;
353 }
354
355 if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
356 return NXT_ERROR;
357 }
358
359 if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) {
360 return NXT_ERROR;
361 }
362
363 ret = nxt_app->init(task, data);
364
365 if (nxt_slow_path(ret != NXT_OK)) {
366 nxt_debug(task, "application init failed");
367
368 } else {
369 nxt_debug(task, "application init done");
370 }
371
372 return ret;
373}
374
375
376static nxt_app_module_t *
377nxt_app_module_load(nxt_task_t *task, const char *name)
378{
379 void *dl;
380
381 dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
382
383 if (dl != NULL) {
384 return dlsym(dl, "nxt_app_module");
385 }
386
387 nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
388
389 return NULL;
390}
391
392
393static nxt_int_t
394nxt_app_set_environment(nxt_conf_value_t *environment)
395{
396 char *env, *p;
397 uint32_t next;
398 nxt_str_t name, value;
399 nxt_conf_value_t *value_obj;
400
401 if (environment != NULL) {
402 next = 0;
403
404 for ( ;; ) {
405 value_obj = nxt_conf_next_object_member(environment, &name, &next);
406 if (value_obj == NULL) {
407 break;
408 }
409
410 nxt_conf_get_string(value_obj, &value);
411
412 env = nxt_malloc(name.length + value.length + 2);
413 if (nxt_slow_path(env == NULL)) {
414 return NXT_ERROR;
415 }
416
417 p = nxt_cpymem(env, name.start, name.length);
418 *p++ = '=';
419 p = nxt_cpymem(p, value.start, value.length);
420 *p = '\0';
421
422 if (nxt_slow_path(putenv(env) != 0)) {
423 return NXT_ERROR;
424 }
425 }
426 }
427
428 return NXT_OK;
429}
430
431
432void
433nxt_app_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
434{
435 if (nxt_app->atexit != NULL) {
436 nxt_app->atexit(task);
437 }
438
439 nxt_worker_process_quit_handler(task, msg);
440}
441
442
443void
444nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
445{
446 size_t dump_size;
447 nxt_int_t res;
448 nxt_buf_t *b;
449 nxt_port_t *port;
450 nxt_app_rmsg_t rmsg = { msg->buf };
451 nxt_app_wmsg_t wmsg;
452
453 b = msg->buf;
454 dump_size = b->mem.free - b->mem.pos;
455
456 if (dump_size > 300) {
457 dump_size = 300;
458 }
459
460 nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos);
461
462 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
463 msg->port_msg.reply_port);
464 if (nxt_slow_path(port == NULL)) {
465 nxt_debug(task, "stream #%uD: reply port %d not found",
466 msg->port_msg.stream, msg->port_msg.reply_port);
467 return;
468 }
469
470 wmsg.port = port;
471 wmsg.write = NULL;
472 wmsg.buf = &wmsg.write;
473 wmsg.stream = msg->port_msg.stream;
474
475 res = nxt_app->run(task, &rmsg, &wmsg);
476
477 if (nxt_slow_path(res != NXT_OK)) {
478 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
479 msg->port_msg.stream, 0, NULL);
480 }
481}
482
483
484u_char *
485nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
486{
487 size_t free_size;
488 u_char *res;
489 nxt_buf_t *b;
490
491 res = NULL;
492
493 do {
494 b = *msg->buf;
495
496 if (b == NULL) {
497 b = nxt_port_mmap_get_buf(task, msg->port, size);
498 if (nxt_slow_path(b == NULL)) {
499 return NULL;
500 }
501
502 *msg->buf = b;
503
504 free_size = nxt_buf_mem_free_size(&b->mem);
505
506 if (nxt_slow_path(free_size < size)) {
507 nxt_log(task, NXT_LOG_WARN, "requested buffer too big "
508 "(%z < %z)", free_size, size);
509 return NULL;
510 }
511
512 }
513
514 free_size = nxt_buf_mem_free_size(&b->mem);
515
516 if (free_size >= size) {
517 res = b->mem.free;
518 b->mem.free += size;
519
520 return res;
521 }
522
523 if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) {
524 res = b->mem.free;
525 b->mem.free += size;
526
527 return res;
528 }
529
530 msg->buf = &b->next;
531 } while(1);
532}
533
534
535nxt_int_t
536nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size)
537{
538 u_char *dst;
539 size_t dst_length;
540
541 if (c != NULL) {
542 dst_length = size + (size < 128 ? 1 : 4) + 1;
543
544 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
545 if (nxt_slow_path(dst == NULL)) {
546 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
547 dst_length);
548 return NXT_ERROR;
549 }
550
551 dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */
552
553 nxt_memcpy(dst, c, size);
554 dst[size] = 0;
555
556 nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, size, c);
557
558 } else {
559 dst_length = 1;
560
561 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
562 if (nxt_slow_path(dst == NULL)) {
563 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
564 dst_length);
565 return NXT_ERROR;
566 }
567
568 dst = nxt_app_msg_write_length(dst, 0);
569
570 nxt_debug(task, "nxt_app_msg_write: NULL");
571 }
572
573 return NXT_OK;
574}
575
576
577nxt_int_t
578nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg,
579 const nxt_str_t *prefix, u_char *c, size_t size)
580{
581 u_char *dst, *src;
582 size_t i, length, dst_length;
583
584 length = prefix->length + size;
585
586 dst_length = length + (length < 128 ? 1 : 4) + 1;
587
588 dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
589 if (nxt_slow_path(dst == NULL)) {
590 return NXT_ERROR;
591 }
592
593 dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */
594
595 nxt_memcpy(dst, prefix->start, prefix->length);
596 dst += prefix->length;
597
598 src = c;
599 for (i = 0; i < size; i++, dst++, src++) {
600
601 if (*src >= 'a' && *src <= 'z') {
602 *dst = *src & ~0x20;
603 continue;
604 }
605
606 if (*src == '-') {
607 *dst = '_';
608 continue;
609 }
610
611 *dst = *src;
612 }
613
614 *dst = 0;
615
616 return NXT_OK;
617}
618
619
620nxt_inline nxt_int_t
621nxt_app_msg_read_size_(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
622{
623 nxt_buf_t *buf;
624
625 do {
626 buf = msg->buf;
627
628 if (nxt_slow_path(buf == NULL)) {
629 return NXT_DONE;
630 }
631
632 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
633 if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
634 msg->buf = buf->next;
635 continue;
636 }
637 return NXT_ERROR;
638 }
639
640 if (buf->mem.pos[0] >= 128) {
641 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
642 return NXT_ERROR;
643 }
644 }
645
646 break;
647 } while (1);
648
649 buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
650
651 return NXT_OK;
652}
653
654
655nxt_int_t
656nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
657{
658 size_t length;
659 nxt_int_t ret;
660 nxt_buf_t *buf;
661
662 ret = nxt_app_msg_read_size_(task, msg, &length);
663 if (ret != NXT_OK) {
664 return ret;
665 }
666
667 buf = msg->buf;
668
669 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) {
670 return NXT_ERROR;
671 }
672
673 if (length > 0) {
674 str->start = buf->mem.pos;
675 str->length = length - 1;
676
677 buf->mem.pos += length;
678
679 nxt_debug(task, "nxt_read_str: %uz %*s", length - 1,
680 length - 1, str->start);
681
682 } else {
683 str->start = NULL;
684 str->length = 0;
685
686 nxt_debug(task, "nxt_read_str: NULL");
687 }
688
689 return NXT_OK;
690}
691
692
693size_t
694nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst,
695 size_t size)
696{
697 size_t res, read_size;
698 nxt_buf_t *buf;
699
700 res = 0;
701
702 while (size > 0) {
703 buf = msg->buf;
704
705 if (nxt_slow_path(buf == NULL)) {
706 break;
707 }
708
709 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
710 msg->buf = buf->next;
711 continue;
712 }
713
714 read_size = nxt_buf_mem_used_size(&buf->mem);
715 read_size = nxt_min(read_size, size);
716
717 dst = nxt_cpymem(dst, buf->mem.pos, read_size);
718
719 size -= read_size;
720 buf->mem.pos += read_size;
721 res += read_size;
722 }
723
724 nxt_debug(task, "nxt_read_raw: %uz", res);
725
726 return res;
727}
728
729
730nxt_int_t
731nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
732 nxt_str_t *v)
733{
734 nxt_int_t rc;
735
736 rc = nxt_app_msg_read_str(task, rmsg, n);
737 if (nxt_slow_path(rc != NXT_OK)) {
738 return rc;
739 }
740
741 rc = nxt_app_msg_read_str(task, rmsg, v);
742 if (nxt_slow_path(rc != NXT_OK)) {
743 return rc;
744 }
745
746 return rc;
747}
748
749
750nxt_int_t
751nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
752{
753 nxt_int_t ret;
754
755 ret = nxt_app_msg_read_size_(task, msg, size);
756
757 nxt_debug(task, "nxt_read_size: %d", (int) *size);
758
759 return ret;
760}
761
762
763nxt_int_t
764nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ar)
765{
766 ar->timer.handler = nxt_app_http_release;
767 nxt_timer_add(task->thread->engine, &ar->timer, 0);
768
769 return NXT_OK;
770}
771
772
773static void
774nxt_app_http_release(nxt_task_t *task, void *obj, void *data)
775{
776 nxt_timer_t *timer;
777 nxt_app_parse_ctx_t *ar;
778
779 timer = obj;
780
781 nxt_debug(task, "http app release");
782
783 ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
784
785 nxt_mp_release(ar->request->mem_pool);
786}
787
788
789nxt_int_t
790nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last)
791{
792 nxt_int_t rc;
793 nxt_buf_t *b;
794
795 rc = NXT_OK;
796
797 if (nxt_slow_path(last == 1)) {
798 do {
799 b = *msg->buf;
800
801 if (b == NULL) {
802 b = nxt_buf_sync_alloc(msg->port->mem_pool, NXT_BUF_SYNC_LAST);
803 *msg->buf = b;
804 break;
805 }
806
807 msg->buf = &b->next;
808 } while(1);
809 }
810
811 if (nxt_slow_path(msg->write != NULL)) {
812 rc = nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_DATA,
813 -1, msg->stream, 0, msg->write);
814
815 msg->write = NULL;
816 msg->buf = &msg->write;
817 }
818
819 return rc;
820}
821
822
823nxt_int_t
824nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
825 size_t size)
826{
827 size_t free_size, copy_size;
828 nxt_buf_t *b;
829
830 nxt_debug(task, "nxt_app_msg_write_raw: %uz", size);
831
832 while (size > 0) {
833 b = *msg->buf;
834
835 if (b == NULL) {
836 b = nxt_port_mmap_get_buf(task, msg->port, size);
837 if (nxt_slow_path(b == NULL)) {
838 return NXT_ERROR;
839 }
840
841 *msg->buf = b;
842 }
843
844 do {
845 free_size = nxt_buf_mem_free_size(&b->mem);
846
847 if (free_size > 0) {
848 copy_size = nxt_min(free_size, size);
849
850 b->mem.free = nxt_cpymem(b->mem.free, c, copy_size);
851
852 size -= copy_size;
853 c += copy_size;
854
855 if (size == 0) {
856 return NXT_OK;
857 }
858 }
859 } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK);
860
861 msg->buf = &b->next;
862 }
863
864 return NXT_OK;
865}
866
867
868nxt_app_lang_module_t *
869nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
870{
871 u_char *p, *end, *version;
872 size_t version_length;
873 nxt_uint_t i, n;
874 nxt_app_type_t type;
875 nxt_app_lang_module_t *lang;
876
877 end = name->start + name->length;
878 version = end;
879
880 for (p = name->start; p < end; p++) {
881 if (*p == ' ') {
882 version = p + 1;
883 break;
884 }
885
886 if (*p >= '0' && *p <= '9') {
887 version = p;
888 break;
889 }
890 }
891
892 type = nxt_app_parse_type(name->start, p - name->start);
893
894 if (type == NXT_APP_UNKNOWN) {
895 return NULL;
896 }
897
898 version_length = end - version;
899
900 lang = rt->languages->elts;
901 n = rt->languages->nelts;
902
903 for (i = 0; i < n; i++) {
904
905 /*
906 * Versions are sorted in descending order
907 * so first match chooses the highest version.
908 */
909
910 if (lang[i].type == type
911 && nxt_strvers_match(lang[i].version, version, version_length))
912 {
913 return &lang[i];
914 }
915 }
916
917 return NULL;
918}
919
920
921nxt_app_type_t
922nxt_app_parse_type(u_char *p, size_t length)
923{
924 nxt_str_t str;
925
926 str.length = length;
927 str.start = p;
928
929 if (nxt_str_eq(&str, "python", 6)) {
930 return NXT_APP_PYTHON;
931
932 } else if (nxt_str_eq(&str, "php", 3)) {
933 return NXT_APP_PHP;
934
935 } else if (nxt_str_eq(&str, "go", 2)) {
936 return NXT_APP_GO;
937
938 } else if (nxt_str_eq(&str, "perl", 4)) {
939 return NXT_APP_PERL;
940
941 } else if (nxt_str_eq(&str, "ruby", 4)) {
942 return NXT_APP_RUBY;
943 }
944
945 return NXT_APP_UNKNOWN;
946}