nxt_unit.c (1810:9fcc8edf2201) nxt_unit.c (1980:43553aa72111)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"

--- 11 unchanged lines hidden (view full) ---

20#if (NXT_HAVE_MEMFD_CREATE)
21#include <linux/memfd.h>
22#endif
23
24#define NXT_UNIT_MAX_PLAIN_SIZE 1024
25#define NXT_UNIT_LOCAL_BUF_SIZE \
26 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
27
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"

--- 11 unchanged lines hidden (view full) ---

20#if (NXT_HAVE_MEMFD_CREATE)
21#include <linux/memfd.h>
22#endif
23
24#define NXT_UNIT_MAX_PLAIN_SIZE 1024
25#define NXT_UNIT_LOCAL_BUF_SIZE \
26 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
27
28enum {
29 NXT_QUIT_NORMAL = 0,
30 NXT_QUIT_GRACEFUL = 1,
31};
32
28typedef struct nxt_unit_impl_s nxt_unit_impl_t;
29typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
30typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
31typedef struct nxt_unit_process_s nxt_unit_process_t;
32typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
33typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
34typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
35typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;

--- 10 unchanged lines hidden (view full) ---

46nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
47nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
48 nxt_unit_mmap_buf_t *mmap_buf);
49nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
50 nxt_unit_mmap_buf_t *mmap_buf);
51nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
52static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
53 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
33typedef struct nxt_unit_impl_s nxt_unit_impl_t;
34typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
35typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
36typedef struct nxt_unit_process_s nxt_unit_process_t;
37typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
38typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
39typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
40typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;

--- 10 unchanged lines hidden (view full) ---

51nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
52nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
53 nxt_unit_mmap_buf_t *mmap_buf);
54nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
55 nxt_unit_mmap_buf_t *mmap_buf);
56nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
57static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
58 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
54 int *log_fd, uint32_t *stream, uint32_t *shm_limit);
59 int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60 uint32_t *request_limit);
55static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
56 int queue_fd);
57static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
58 nxt_unit_request_info_t **preq);
59static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
60 nxt_unit_recv_msg_t *recv_msg);
61static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
62static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,

--- 62 unchanged lines hidden (view full) ---

125static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
126
127static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
128static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
129 pid_t pid, int remove);
130static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
131static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
132static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
61static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62 int queue_fd);
63static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64 nxt_unit_request_info_t **preq);
65static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66 nxt_unit_recv_msg_t *recv_msg);
67static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,

--- 62 unchanged lines hidden (view full) ---

131static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132
133static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135 pid_t pid, int remove);
136static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
133static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
134static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
135nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
136nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
137nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
138nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
139static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
140 nxt_unit_port_t *port);

--- 4 unchanged lines hidden (view full) ---

145 nxt_unit_port_t *port, int queue_fd);
146
147nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
148nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
149static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
150 nxt_unit_port_t *port, void *queue);
151static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
152 nxt_queue_t *awaiting_req);
140static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147 nxt_unit_port_t *port);

--- 4 unchanged lines hidden (view full) ---

152 nxt_unit_port_t *port, int queue_fd);
153
154nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157 nxt_unit_port_t *port, void *queue);
158static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159 nxt_queue_t *awaiting_req);
153static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
160static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
154 nxt_unit_port_id_t *port_id);
155static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
156 nxt_unit_port_id_t *port_id);
157static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
158static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
159 nxt_unit_process_t *process);
161 nxt_unit_port_id_t *port_id);
162static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163 nxt_unit_port_id_t *port_id);
164static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166 nxt_unit_process_t *process);
160static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
167static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
161static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
162static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
163 nxt_unit_port_t *port, const void *buf, size_t buf_size,
164 const void *oob, size_t oob_size);
165static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
166 const void *buf, size_t buf_size, const void *oob, size_t oob_size);
167static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
168 nxt_unit_read_buf_t *rbuf);
169nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
170 nxt_unit_read_buf_t *src);
171static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
172 nxt_unit_read_buf_t *rbuf);
173static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
174 nxt_unit_read_buf_t *rbuf);
175static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
176 nxt_unit_read_buf_t *rbuf);
168static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170 nxt_unit_port_t *port, const void *buf, size_t buf_size,
171 const void *oob, size_t oob_size);
172static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173 const void *buf, size_t buf_size, const void *oob, size_t oob_size);
174static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175 nxt_unit_read_buf_t *rbuf);
176nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177 nxt_unit_read_buf_t *src);
178static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179 nxt_unit_read_buf_t *rbuf);
180static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181 nxt_unit_read_buf_t *rbuf);
182static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183 nxt_unit_read_buf_t *rbuf);
177static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
184static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
178 nxt_unit_read_buf_t *rbuf);
179nxt_inline int nxt_unit_close(int fd);
180static int nxt_unit_fd_blocking(int fd);
181
182static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
183 nxt_unit_port_t *port);
184static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
185 nxt_unit_port_id_t *port_id, int remove);

--- 120 unchanged lines hidden (view full) ---

306 nxt_queue_t ready_req;
307
308 /* of nxt_unit_read_buf_t */
309 nxt_queue_t pending_rbuf;
310
311 /* of nxt_unit_read_buf_t */
312 nxt_queue_t free_rbuf;
313
185 nxt_unit_read_buf_t *rbuf);
186nxt_inline int nxt_unit_close(int fd);
187static int nxt_unit_fd_blocking(int fd);
188
189static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190 nxt_unit_port_t *port);
191static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192 nxt_unit_port_id_t *port_id, int remove);

--- 120 unchanged lines hidden (view full) ---

313 nxt_queue_t ready_req;
314
315 /* of nxt_unit_read_buf_t */
316 nxt_queue_t pending_rbuf;
317
318 /* of nxt_unit_read_buf_t */
319 nxt_queue_t free_rbuf;
320
314 int online;
315 int ready;
321 uint8_t online; /* 1 bit */
322 uint8_t ready; /* 1 bit */
323 uint8_t quit_param;
316
317 nxt_unit_mmap_buf_t ctx_buf[2];
318 nxt_unit_read_buf_t ctx_read_buf;
319
320 nxt_unit_request_info_impl_t req;
321};
322
323

--- 15 unchanged lines hidden (view full) ---

339};
340
341
342struct nxt_unit_impl_s {
343 nxt_unit_t unit;
344 nxt_unit_callbacks_t callbacks;
345
346 nxt_atomic_t use_count;
324
325 nxt_unit_mmap_buf_t ctx_buf[2];
326 nxt_unit_read_buf_t ctx_read_buf;
327
328 nxt_unit_request_info_impl_t req;
329};
330
331

--- 15 unchanged lines hidden (view full) ---

347};
348
349
350struct nxt_unit_impl_s {
351 nxt_unit_t unit;
352 nxt_unit_callbacks_t callbacks;
353
354 nxt_atomic_t use_count;
355 nxt_atomic_t request_count;
347
348 uint32_t request_data_size;
349 uint32_t shm_mmap_limit;
356
357 uint32_t request_data_size;
358 uint32_t shm_mmap_limit;
359 uint32_t request_limit;
350
351 pthread_mutex_t mutex;
352
353 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
354 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
355
356 nxt_unit_port_t *router_port;
357 nxt_unit_port_t *shared_port;

--- 51 unchanged lines hidden (view full) ---

409} nxt_unit_port_hash_id_t;
410
411
412nxt_unit_ctx_t *
413nxt_unit_init(nxt_unit_init_t *init)
414{
415 int rc, queue_fd;
416 void *mem;
360
361 pthread_mutex_t mutex;
362
363 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
364 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
365
366 nxt_unit_port_t *router_port;
367 nxt_unit_port_t *shared_port;

--- 51 unchanged lines hidden (view full) ---

419} nxt_unit_port_hash_id_t;
420
421
422nxt_unit_ctx_t *
423nxt_unit_init(nxt_unit_init_t *init)
424{
425 int rc, queue_fd;
426 void *mem;
417 uint32_t ready_stream, shm_limit;
427 uint32_t ready_stream, shm_limit, request_limit;
418 nxt_unit_ctx_t *ctx;
419 nxt_unit_impl_t *lib;
420 nxt_unit_port_t ready_port, router_port, read_port;
421
422 lib = nxt_unit_create(init);
423 if (nxt_slow_path(lib == NULL)) {
424 return NULL;
425 }

--- 15 unchanged lines hidden (view full) ---

441 ready_port.id.id);
442 nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
443 router_port.id.id);
444 nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
445 read_port.id.id);
446
447 } else {
448 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
428 nxt_unit_ctx_t *ctx;
429 nxt_unit_impl_t *lib;
430 nxt_unit_port_t ready_port, router_port, read_port;
431
432 lib = nxt_unit_create(init);
433 if (nxt_slow_path(lib == NULL)) {
434 return NULL;
435 }

--- 15 unchanged lines hidden (view full) ---

451 ready_port.id.id);
452 nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
453 router_port.id.id);
454 nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
455 read_port.id.id);
456
457 } else {
458 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
449 &lib->log_fd, &ready_stream, &shm_limit);
459 &lib->log_fd, &ready_stream, &shm_limit,
460 &request_limit);
450 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
451 goto fail;
452 }
453
454 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
455 / PORT_MMAP_DATA_SIZE;
461 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
462 goto fail;
463 }
464
465 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
466 / PORT_MMAP_DATA_SIZE;
467 lib->request_limit = request_limit;
456 }
457
458 if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
459 lib->shm_mmap_limit = 1;
460 }
461
462 lib->pid = read_port.id.pid;
463

--- 95 unchanged lines hidden (view full) ---

559 }
560
561 lib->unit.data = init->data;
562 lib->callbacks = init->callbacks;
563
564 lib->request_data_size = init->request_data_size;
565 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
566 / PORT_MMAP_DATA_SIZE;
468 }
469
470 if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
471 lib->shm_mmap_limit = 1;
472 }
473
474 lib->pid = read_port.id.pid;
475

--- 95 unchanged lines hidden (view full) ---

571 }
572
573 lib->unit.data = init->data;
574 lib->callbacks = init->callbacks;
575
576 lib->request_data_size = init->request_data_size;
577 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
578 / PORT_MMAP_DATA_SIZE;
579 lib->request_limit = init->request_limit;
567
568 lib->processes.slot = NULL;
569 lib->ports.slot = NULL;
570
571 lib->log_fd = STDERR_FILENO;
572
573 nxt_queue_init(&lib->contexts);
574
575 lib->use_count = 0;
580
581 lib->processes.slot = NULL;
582 lib->ports.slot = NULL;
583
584 lib->log_fd = STDERR_FILENO;
585
586 nxt_queue_init(&lib->contexts);
587
588 lib->use_count = 0;
589 lib->request_count = 0;
576 lib->router_port = NULL;
577 lib->shared_port = NULL;
578
579 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
580 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
581 pthread_mutex_destroy(&lib->mutex);
582 goto fail;
583 }

--- 43 unchanged lines hidden (view full) ---

627 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
628
629 pthread_mutex_unlock(&lib->mutex);
630
631 ctx_impl->use_count = 1;
632 ctx_impl->wait_items = 0;
633 ctx_impl->online = 1;
634 ctx_impl->ready = 0;
590 lib->router_port = NULL;
591 lib->shared_port = NULL;
592
593 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
594 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
595 pthread_mutex_destroy(&lib->mutex);
596 goto fail;
597 }

--- 43 unchanged lines hidden (view full) ---

641 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
642
643 pthread_mutex_unlock(&lib->mutex);
644
645 ctx_impl->use_count = 1;
646 ctx_impl->wait_items = 0;
647 ctx_impl->online = 1;
648 ctx_impl->ready = 0;
649 ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
635
636 nxt_queue_init(&ctx_impl->free_req);
637 nxt_queue_init(&ctx_impl->free_ws);
638 nxt_queue_init(&ctx_impl->active_req);
639 nxt_queue_init(&ctx_impl->ready_req);
640 nxt_queue_init(&ctx_impl->pending_rbuf);
641 nxt_queue_init(&ctx_impl->free_rbuf);
642

--- 132 unchanged lines hidden (view full) ---

775 *prev = mmap_buf->next;
776 }
777}
778
779
780static int
781nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
782 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
650
651 nxt_queue_init(&ctx_impl->free_req);
652 nxt_queue_init(&ctx_impl->free_ws);
653 nxt_queue_init(&ctx_impl->active_req);
654 nxt_queue_init(&ctx_impl->ready_req);
655 nxt_queue_init(&ctx_impl->pending_rbuf);
656 nxt_queue_init(&ctx_impl->free_rbuf);
657

--- 132 unchanged lines hidden (view full) ---

790 *prev = mmap_buf->next;
791 }
792}
793
794
795static int
796nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
797 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
783 uint32_t *shm_limit)
798 uint32_t *shm_limit, uint32_t *request_limit)
784{
785 int rc;
786 int ready_fd, router_fd, read_in_fd, read_out_fd;
787 char *unit_init, *version_end, *vars;
788 size_t version_length;
789 int64_t ready_pid, router_pid, read_pid;
790 uint32_t ready_stream, router_id, ready_id, read_id;
791

--- 28 unchanged lines hidden (view full) ---

820
821 vars = version_end + 1;
822
823 rc = sscanf(vars,
824 "%"PRIu32";"
825 "%"PRId64",%"PRIu32",%d;"
826 "%"PRId64",%"PRIu32",%d;"
827 "%"PRId64",%"PRIu32",%d,%d;"
799{
800 int rc;
801 int ready_fd, router_fd, read_in_fd, read_out_fd;
802 char *unit_init, *version_end, *vars;
803 size_t version_length;
804 int64_t ready_pid, router_pid, read_pid;
805 uint32_t ready_stream, router_id, ready_id, read_id;
806

--- 28 unchanged lines hidden (view full) ---

835
836 vars = version_end + 1;
837
838 rc = sscanf(vars,
839 "%"PRIu32";"
840 "%"PRId64",%"PRIu32",%d;"
841 "%"PRId64",%"PRIu32",%d;"
842 "%"PRId64",%"PRIu32",%d,%d;"
828 "%d,%"PRIu32,
843 "%d,%"PRIu32",%"PRIu32,
829 &ready_stream,
830 &ready_pid, &ready_id, &ready_fd,
831 &router_pid, &router_id, &router_fd,
832 &read_pid, &read_id, &read_in_fd, &read_out_fd,
844 &ready_stream,
845 &ready_pid, &ready_id, &ready_fd,
846 &router_pid, &router_id, &router_fd,
847 &read_pid, &read_id, &read_in_fd, &read_out_fd,
833 log_fd, shm_limit);
848 log_fd, shm_limit, request_limit);
834
835 if (nxt_slow_path(rc == EOF)) {
836 nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
837 vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
838
839 return NXT_UNIT_ERROR;
840 }
841
849
850 if (nxt_slow_path(rc == EOF)) {
851 nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
852 vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
853
854 return NXT_UNIT_ERROR;
855 }
856
842 if (nxt_slow_path(rc != 13)) {
857 if (nxt_slow_path(rc != 14)) {
843 nxt_unit_alert(NULL, "invalid number of variables in %s env: "
858 nxt_unit_alert(NULL, "invalid number of variables in %s env: "
844 "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars);
859 "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars);
845
846 return NXT_UNIT_ERROR;
847 }
848
849 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
850
851 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
852

--- 71 unchanged lines hidden (view full) ---

924
925
926static int
927nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
928 nxt_unit_request_info_t **preq)
929{
930 int rc;
931 pid_t pid;
860
861 return NXT_UNIT_ERROR;
862 }
863
864 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
865
866 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
867

--- 71 unchanged lines hidden (view full) ---

939
940
941static int
942nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
943 nxt_unit_request_info_t **preq)
944{
945 int rc;
946 pid_t pid;
947 uint8_t quit_param;
932 struct cmsghdr *cm;
933 nxt_port_msg_t *port_msg;
934 nxt_unit_impl_t *lib;
935 nxt_unit_recv_msg_t recv_msg;
936
937 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
938
939 recv_msg.fd[0] = -1;

--- 14 unchanged lines hidden (view full) ---

954 }
955
956 recv_msg.incoming_buf = NULL;
957
958 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
959 if (nxt_slow_path(rbuf->size == 0)) {
960 nxt_unit_debug(ctx, "read port closed");
961
948 struct cmsghdr *cm;
949 nxt_port_msg_t *port_msg;
950 nxt_unit_impl_t *lib;
951 nxt_unit_recv_msg_t recv_msg;
952
953 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
954
955 recv_msg.fd[0] = -1;

--- 14 unchanged lines hidden (view full) ---

970 }
971
972 recv_msg.incoming_buf = NULL;
973
974 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
975 if (nxt_slow_path(rbuf->size == 0)) {
976 nxt_unit_debug(ctx, "read port closed");
977
962 nxt_unit_quit(ctx);
978 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
963 rc = NXT_UNIT_OK;
964 goto done;
965 }
966
967 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
968
969 rc = NXT_UNIT_ERROR;
970 goto done;

--- 42 unchanged lines hidden (view full) ---

1013
1014 switch (port_msg->type) {
1015
1016 case _NXT_PORT_MSG_RPC_READY:
1017 rc = NXT_UNIT_OK;
1018 break;
1019
1020 case _NXT_PORT_MSG_QUIT:
979 rc = NXT_UNIT_OK;
980 goto done;
981 }
982
983 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
984
985 rc = NXT_UNIT_ERROR;
986 goto done;

--- 42 unchanged lines hidden (view full) ---

1029
1030 switch (port_msg->type) {
1031
1032 case _NXT_PORT_MSG_RPC_READY:
1033 rc = NXT_UNIT_OK;
1034 break;
1035
1036 case _NXT_PORT_MSG_QUIT:
1021 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
1037 if (recv_msg.size == sizeof(quit_param)) {
1038 memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1022
1039
1023 nxt_unit_quit(ctx);
1040 } else {
1041 quit_param = NXT_QUIT_NORMAL;
1042 }
1043
1044 nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1045 (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1046
1047 nxt_unit_quit(ctx, quit_param);
1048
1024 rc = NXT_UNIT_OK;
1025 break;
1026
1027 case _NXT_PORT_MSG_NEW_PORT:
1028 rc = nxt_unit_process_new_port(ctx, &recv_msg);
1029 break;
1030
1031 case _NXT_PORT_MSG_PORT_ACK:

--- 183 unchanged lines hidden (view full) ---

1215
1216
1217static int
1218nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1219{
1220 nxt_unit_impl_t *lib;
1221 nxt_unit_ctx_impl_t *ctx_impl;
1222
1049 rc = NXT_UNIT_OK;
1050 break;
1051
1052 case _NXT_PORT_MSG_NEW_PORT:
1053 rc = nxt_unit_process_new_port(ctx, &recv_msg);
1054 break;
1055
1056 case _NXT_PORT_MSG_PORT_ACK:

--- 183 unchanged lines hidden (view full) ---

1240
1241
1242static int
1243nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1244{
1245 nxt_unit_impl_t *lib;
1246 nxt_unit_ctx_impl_t *ctx_impl;
1247
1223 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1224 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1225
1248 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1249
1250 if (nxt_slow_path(ctx_impl->ready)) {
1251 return NXT_UNIT_OK;
1252 }
1253
1226 ctx_impl->ready = 1;
1227
1254 ctx_impl->ready = 1;
1255
1228 if (lib->callbacks.ready_handler) {
1256 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1257
1258 /* Call ready_handler() only for main context. */
1259 if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1229 return lib->callbacks.ready_handler(ctx);
1230 }
1231
1260 return lib->callbacks.ready_handler(ctx);
1261 }
1262
1263 if (&lib->main_ctx != ctx_impl) {
1264 /* Check if the main context is already stopped or quit. */
1265 if (nxt_slow_path(!lib->main_ctx.ready)) {
1266 ctx_impl->ready = 0;
1267
1268 nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1269
1270 return NXT_UNIT_OK;
1271 }
1272
1273 if (lib->callbacks.add_port != NULL) {
1274 lib->callbacks.add_port(ctx, lib->shared_port);
1275 }
1276 }
1277
1232 return NXT_UNIT_OK;
1233}
1234
1235
1236static int
1237nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1238 nxt_unit_request_info_t **preq)
1239{

--- 496 unchanged lines hidden (view full) ---

1736
1737 return req_impl;
1738}
1739
1740
1741static void
1742nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1743{
1278 return NXT_UNIT_OK;
1279}
1280
1281
1282static int
1283nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1284 nxt_unit_request_info_t **preq)
1285{

--- 496 unchanged lines hidden (view full) ---

1782
1783 return req_impl;
1784}
1785
1786
1787static void
1788nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1789{
1790 nxt_unit_ctx_t *ctx;
1744 nxt_unit_ctx_impl_t *ctx_impl;
1745 nxt_unit_request_info_impl_t *req_impl;
1746
1791 nxt_unit_ctx_impl_t *ctx_impl;
1792 nxt_unit_request_info_impl_t *req_impl;
1793
1747 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1794 ctx = req->ctx;
1795 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1748 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1749
1750 req->response = NULL;
1751 req->response_buf = NULL;
1752
1753 if (req_impl->in_hash) {
1754 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1755 }

--- 22 unchanged lines hidden (view full) ---

1778
1779 pthread_mutex_lock(&ctx_impl->mutex);
1780
1781 nxt_queue_remove(&req_impl->link);
1782
1783 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1784
1785 pthread_mutex_unlock(&ctx_impl->mutex);
1796 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1797
1798 req->response = NULL;
1799 req->response_buf = NULL;
1800
1801 if (req_impl->in_hash) {
1802 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1803 }

--- 22 unchanged lines hidden (view full) ---

1826
1827 pthread_mutex_lock(&ctx_impl->mutex);
1828
1829 nxt_queue_remove(&req_impl->link);
1830
1831 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1832
1833 pthread_mutex_unlock(&ctx_impl->mutex);
1834
1835 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1836 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1837 }
1786}
1787
1788
1789static void
1790nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1791{
1792 nxt_unit_ctx_impl_t *ctx_impl;
1793

--- 2723 unchanged lines hidden (view full) ---

4517 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4518
4519 rc = NXT_UNIT_OK;
4520
4521 while (nxt_fast_path(ctx_impl->online)) {
4522 rc = nxt_unit_run_once_impl(ctx);
4523
4524 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
1838}
1839
1840
1841static void
1842nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1843{
1844 nxt_unit_ctx_impl_t *ctx_impl;
1845

--- 2723 unchanged lines hidden (view full) ---

4569 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4570
4571 rc = NXT_UNIT_OK;
4572
4573 while (nxt_fast_path(ctx_impl->online)) {
4574 rc = nxt_unit_run_once_impl(ctx);
4575
4576 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4525 nxt_unit_quit(ctx);
4577 nxt_unit_quit(ctx, NXT_QUIT_NORMAL);
4526 break;
4527 }
4528 }
4529
4530 nxt_unit_ctx_release(ctx);
4531
4532 return rc;
4533}

--- 47 unchanged lines hidden (view full) ---

4581 return rc;
4582}
4583
4584
4585static int
4586nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4587{
4588 int nevents, res, err;
4578 break;
4579 }
4580 }
4581
4582 nxt_unit_ctx_release(ctx);
4583
4584 return rc;
4585}

--- 47 unchanged lines hidden (view full) ---

4633 return rc;
4634}
4635
4636
4637static int
4638nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4639{
4640 int nevents, res, err;
4641 nxt_uint_t nfds;
4589 nxt_unit_impl_t *lib;
4590 nxt_unit_ctx_impl_t *ctx_impl;
4591 nxt_unit_port_impl_t *port_impl;
4592 struct pollfd fds[2];
4593
4594 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4595
4642 nxt_unit_impl_t *lib;
4643 nxt_unit_ctx_impl_t *ctx_impl;
4644 nxt_unit_port_impl_t *port_impl;
4645 struct pollfd fds[2];
4646
4647 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4648
4596 if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) {
4649 if (ctx_impl->wait_items > 0 || !nxt_unit_chk_ready(ctx)) {
4597 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4598 }
4599
4600 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4601 port);
4602
4603 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4604

--- 16 unchanged lines hidden (view full) ---

4621 (int) ctx_impl->read_port->id.id,
4622 (int) rbuf->size);
4623
4624 return NXT_UNIT_OK;
4625 }
4626 }
4627 }
4628
4650 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4651 }
4652
4653 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4654 port);
4655
4656 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4657

--- 16 unchanged lines hidden (view full) ---

4674 (int) ctx_impl->read_port->id.id,
4675 (int) rbuf->size);
4676
4677 return NXT_UNIT_OK;
4678 }
4679 }
4680 }
4681
4629 res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
4630 if (res == NXT_UNIT_OK) {
4631 return NXT_UNIT_OK;
4682 if (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
4683 res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
4684 if (res == NXT_UNIT_OK) {
4685 return NXT_UNIT_OK;
4686 }
4687
4688 fds[1].fd = lib->shared_port->in_fd;
4689 fds[1].events = POLLIN;
4690
4691 nfds = 2;
4692
4693 } else {
4694 nfds = 1;
4632 }
4633
4634 fds[0].fd = ctx_impl->read_port->in_fd;
4635 fds[0].events = POLLIN;
4636 fds[0].revents = 0;
4637
4695 }
4696
4697 fds[0].fd = ctx_impl->read_port->in_fd;
4698 fds[0].events = POLLIN;
4699 fds[0].revents = 0;
4700
4638 fds[1].fd = lib->shared_port->in_fd;
4639 fds[1].events = POLLIN;
4640 fds[1].revents = 0;
4641
4701 fds[1].revents = 0;
4702
4642 nevents = poll(fds, 2, -1);
4703 nevents = poll(fds, nfds, -1);
4643 if (nxt_slow_path(nevents == -1)) {
4644 err = errno;
4645
4646 if (err == EINTR) {
4647 goto retry;
4648 }
4649
4650 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",

--- 30 unchanged lines hidden (view full) ---

4681 fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4682 fds[1].revents);
4683
4684 return NXT_UNIT_ERROR;
4685}
4686
4687
4688static int
4704 if (nxt_slow_path(nevents == -1)) {
4705 err = errno;
4706
4707 if (err == EINTR) {
4708 goto retry;
4709 }
4710
4711 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",

--- 30 unchanged lines hidden (view full) ---

4742 fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4743 fds[1].revents);
4744
4745 return NXT_UNIT_ERROR;
4746}
4747
4748
4749static int
4750nxt_unit_chk_ready(nxt_unit_ctx_t *ctx)
4751{
4752 nxt_unit_impl_t *lib;
4753 nxt_unit_ctx_impl_t *ctx_impl;
4754
4755 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4756 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4757
4758 return (ctx_impl->ready
4759 && (lib->request_limit == 0
4760 || lib->request_count < lib->request_limit));
4761}
4762
4763
4764static int
4689nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4690{
4691 int rc;
4692 nxt_queue_t pending_rbuf;
4693 nxt_unit_ctx_impl_t *ctx_impl;
4694 nxt_unit_read_buf_t *rbuf;
4695
4696 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

--- 21 unchanged lines hidden (view full) ---

4718 rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
4719
4720 } else {
4721 nxt_unit_read_buf_release(ctx, rbuf);
4722 }
4723
4724 } nxt_queue_loop;
4725
4765nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4766{
4767 int rc;
4768 nxt_queue_t pending_rbuf;
4769 nxt_unit_ctx_impl_t *ctx_impl;
4770 nxt_unit_read_buf_t *rbuf;
4771
4772 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

--- 21 unchanged lines hidden (view full) ---

4794 rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
4795
4796 } else {
4797 nxt_unit_read_buf_release(ctx, rbuf);
4798 }
4799
4800 } nxt_queue_loop;
4801
4802 if (!ctx_impl->ready) {
4803 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
4804 }
4805
4726 return rc;
4727}
4728
4729
4730static void
4731nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4732{
4733 int res;

--- 164 unchanged lines hidden (view full) ---

4898
4899
4900int
4901nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4902{
4903 int rc;
4904 nxt_unit_impl_t *lib;
4905 nxt_unit_read_buf_t *rbuf;
4806 return rc;
4807}
4808
4809
4810static void
4811nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4812{
4813 int res;

--- 164 unchanged lines hidden (view full) ---

4978
4979
4980int
4981nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4982{
4983 int rc;
4984 nxt_unit_impl_t *lib;
4985 nxt_unit_read_buf_t *rbuf;
4906 nxt_unit_ctx_impl_t *ctx_impl;
4907
4908 nxt_unit_ctx_use(ctx);
4909
4910 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4986
4987 nxt_unit_ctx_use(ctx);
4988
4989 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4911 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4912
4913 rc = NXT_UNIT_OK;
4914
4990
4991 rc = NXT_UNIT_OK;
4992
4915 while (nxt_fast_path(ctx_impl->online)) {
4993 while (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
4916 rbuf = nxt_unit_read_buf_get(ctx);
4917 if (nxt_slow_path(rbuf == NULL)) {
4918 rc = NXT_UNIT_ERROR;
4919 break;
4920 }
4921
4922 retry:
4923

--- 20 unchanged lines hidden (view full) ---

4944
4945
4946nxt_unit_request_info_t *
4947nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
4948{
4949 int rc;
4950 nxt_unit_impl_t *lib;
4951 nxt_unit_read_buf_t *rbuf;
4994 rbuf = nxt_unit_read_buf_get(ctx);
4995 if (nxt_slow_path(rbuf == NULL)) {
4996 rc = NXT_UNIT_ERROR;
4997 break;
4998 }
4999
5000 retry:
5001

--- 20 unchanged lines hidden (view full) ---

5022
5023
5024nxt_unit_request_info_t *
5025nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
5026{
5027 int rc;
5028 nxt_unit_impl_t *lib;
5029 nxt_unit_read_buf_t *rbuf;
4952 nxt_unit_ctx_impl_t *ctx_impl;
4953 nxt_unit_request_info_t *req;
4954
4955 nxt_unit_ctx_use(ctx);
4956
4957 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5030 nxt_unit_request_info_t *req;
5031
5032 nxt_unit_ctx_use(ctx);
5033
5034 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4958 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4959
4960 req = NULL;
4961
5035
5036 req = NULL;
5037
4962 if (nxt_slow_path(!ctx_impl->online)) {
5038 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
4963 goto done;
4964 }
4965
4966 rbuf = nxt_unit_read_buf_get(ctx);
4967 if (nxt_slow_path(rbuf == NULL)) {
4968 goto done;
4969 }
4970
5039 goto done;
5040 }
5041
5042 rbuf = nxt_unit_read_buf_get(ctx);
5043 if (nxt_slow_path(rbuf == NULL)) {
5044 goto done;
5045 }
5046
4971 rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
5047 rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
4972 if (rc != NXT_UNIT_OK) {
4973 nxt_unit_read_buf_release(ctx, rbuf);
4974 goto done;
4975 }
4976
4977 (void) nxt_unit_process_msg(ctx, rbuf, &req);
4978
4979done:
4980
4981 nxt_unit_ctx_release(ctx);
4982
4983 return req;
4984}
4985
4986
4987int
5048 if (rc != NXT_UNIT_OK) {
5049 nxt_unit_read_buf_release(ctx, rbuf);
5050 goto done;
5051 }
5052
5053 (void) nxt_unit_process_msg(ctx, rbuf, &req);
5054
5055done:
5056
5057 nxt_unit_ctx_release(ctx);
5058
5059 return req;
5060}
5061
5062
5063int
4988nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
4989{
4990 nxt_unit_impl_t *lib;
4991
4992 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4993
4994 return (ctx == &lib->main_ctx.ctx);
4995}
4996
4997
4998int
4999nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5000{
5001 int rc;
5002
5003 nxt_unit_ctx_use(ctx);
5004
5005 rc = nxt_unit_process_port_msg_impl(ctx, port);
5006

--- 5 unchanged lines hidden (view full) ---

5012
5013static int
5014nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5015{
5016 int rc;
5017 nxt_unit_impl_t *lib;
5018 nxt_unit_read_buf_t *rbuf;
5019
5064nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5065{
5066 int rc;
5067
5068 nxt_unit_ctx_use(ctx);
5069
5070 rc = nxt_unit_process_port_msg_impl(ctx, port);
5071

--- 5 unchanged lines hidden (view full) ---

5077
5078static int
5079nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5080{
5081 int rc;
5082 nxt_unit_impl_t *lib;
5083 nxt_unit_read_buf_t *rbuf;
5084
5085 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5086
5087 if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) {
5088 return NXT_UNIT_AGAIN;
5089 }
5090
5020 rbuf = nxt_unit_read_buf_get(ctx);
5021 if (nxt_slow_path(rbuf == NULL)) {
5022 return NXT_UNIT_ERROR;
5023 }
5024
5091 rbuf = nxt_unit_read_buf_get(ctx);
5092 if (nxt_slow_path(rbuf == NULL)) {
5093 return NXT_UNIT_ERROR;
5094 }
5095
5025 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5026
5027 if (port == lib->shared_port) {
5028 rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
5029
5030 } else {
5031 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
5032 }
5033
5034 if (rc != NXT_UNIT_OK) {

--- 154 unchanged lines hidden (view full) ---

5189
5190 pthread_mutex_lock(&lib->mutex);
5191
5192 nxt_queue_remove(&ctx_impl->link);
5193
5194 pthread_mutex_unlock(&lib->mutex);
5195
5196 if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5096 if (port == lib->shared_port) {
5097 rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
5098
5099 } else {
5100 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
5101 }
5102
5103 if (rc != NXT_UNIT_OK) {

--- 154 unchanged lines hidden (view full) ---

5258
5259 pthread_mutex_lock(&lib->mutex);
5260
5261 nxt_queue_remove(&ctx_impl->link);
5262
5263 pthread_mutex_unlock(&lib->mutex);
5264
5265 if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5197 nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
5266 nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id);
5198 nxt_unit_port_release(ctx_impl->read_port);
5199 }
5200
5201 if (ctx_impl != &lib->main_ctx) {
5202 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
5203 }
5204
5205 nxt_unit_lib_release(lib);

--- 394 unchanged lines hidden (view full) ---

5600
5601 nxt_unit_awake_ctx(ctx, ctx_impl);
5602
5603 } nxt_queue_loop;
5604}
5605
5606
5607static void
5267 nxt_unit_port_release(ctx_impl->read_port);
5268 }
5269
5270 if (ctx_impl != &lib->main_ctx) {
5271 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
5272 }
5273
5274 nxt_unit_lib_release(lib);

--- 394 unchanged lines hidden (view full) ---

5669
5670 nxt_unit_awake_ctx(ctx, ctx_impl);
5671
5672 } nxt_queue_loop;
5673}
5674
5675
5676static void
5608nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5677nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
5678 nxt_unit_port_id_t *port_id)
5609{
5610 nxt_unit_port_t *port;
5611 nxt_unit_port_impl_t *port_impl;
5612
5613 pthread_mutex_lock(&lib->mutex);
5614
5615 port = nxt_unit_remove_port_unsafe(lib, port_id);
5616
5617 if (nxt_fast_path(port != NULL)) {
5618 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5619
5620 nxt_queue_remove(&port_impl->link);
5621 }
5622
5623 pthread_mutex_unlock(&lib->mutex);
5624
5625 if (lib->callbacks.remove_port != NULL && port != NULL) {
5679{
5680 nxt_unit_port_t *port;
5681 nxt_unit_port_impl_t *port_impl;
5682
5683 pthread_mutex_lock(&lib->mutex);
5684
5685 port = nxt_unit_remove_port_unsafe(lib, port_id);
5686
5687 if (nxt_fast_path(port != NULL)) {
5688 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5689
5690 nxt_queue_remove(&port_impl->link);
5691 }
5692
5693 pthread_mutex_unlock(&lib->mutex);
5694
5695 if (lib->callbacks.remove_port != NULL && port != NULL) {
5626 lib->callbacks.remove_port(&lib->unit, port);
5696 lib->callbacks.remove_port(&lib->unit, ctx, port);
5627 }
5628
5629 if (nxt_fast_path(port != NULL)) {
5630 nxt_unit_port_release(port);
5631 }
5632}
5633
5634

--- 60 unchanged lines hidden (view full) ---

5695
5696 pthread_mutex_unlock(&lib->mutex);
5697
5698 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5699
5700 nxt_queue_remove(&port->link);
5701
5702 if (lib->callbacks.remove_port != NULL) {
5697 }
5698
5699 if (nxt_fast_path(port != NULL)) {
5700 nxt_unit_port_release(port);
5701 }
5702}
5703
5704

--- 60 unchanged lines hidden (view full) ---

5765
5766 pthread_mutex_unlock(&lib->mutex);
5767
5768 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5769
5770 nxt_queue_remove(&port->link);
5771
5772 if (lib->callbacks.remove_port != NULL) {
5703 lib->callbacks.remove_port(&lib->unit, &port->port);
5773 lib->callbacks.remove_port(&lib->unit, NULL, &port->port);
5704 }
5705
5706 nxt_unit_port_release(&port->port);
5707
5708 } nxt_queue_loop;
5709
5710 nxt_unit_process_release(process);
5711}
5712
5713
5714static void
5774 }
5775
5776 nxt_unit_port_release(&port->port);
5777
5778 } nxt_queue_loop;
5779
5780 nxt_unit_process_release(process);
5781}
5782
5783
5784static void
5715nxt_unit_quit(nxt_unit_ctx_t *ctx)
5785nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param)
5716{
5786{
5717 nxt_port_msg_t msg;
5787 nxt_bool_t skip_graceful_broadcast, quit;
5718 nxt_unit_impl_t *lib;
5719 nxt_unit_ctx_impl_t *ctx_impl;
5720 nxt_unit_callbacks_t *cb;
5721 nxt_unit_request_info_t *req;
5722 nxt_unit_request_info_impl_t *req_impl;
5723
5788 nxt_unit_impl_t *lib;
5789 nxt_unit_ctx_impl_t *ctx_impl;
5790 nxt_unit_callbacks_t *cb;
5791 nxt_unit_request_info_t *req;
5792 nxt_unit_request_info_impl_t *req_impl;
5793
5794 struct {
5795 nxt_port_msg_t msg;
5796 uint8_t quit_param;
5797 } nxt_packed m;
5798
5724 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5725 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5726
5799 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5800 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5801
5727 if (!ctx_impl->online) {
5802 nxt_unit_debug(ctx, "quit: %d/%d/%d", (int) quit_param, ctx_impl->ready,
5803 ctx_impl->online);
5804
5805 if (nxt_slow_path(!ctx_impl->online)) {
5728 return;
5729 }
5730
5806 return;
5807 }
5808
5731 ctx_impl->online = 0;
5809 skip_graceful_broadcast = quit_param == NXT_QUIT_GRACEFUL
5810 && !ctx_impl->ready;
5732
5733 cb = &lib->callbacks;
5734
5811
5812 cb = &lib->callbacks;
5813
5735 if (cb->quit != NULL) {
5736 cb->quit(ctx);
5814 if (nxt_fast_path(ctx_impl->ready)) {
5815 ctx_impl->ready = 0;
5816
5817 if (cb->remove_port != NULL) {
5818 cb->remove_port(&lib->unit, ctx, lib->shared_port);
5819 }
5737 }
5738
5820 }
5821
5739 nxt_queue_each(req_impl, &ctx_impl->active_req,
5740 nxt_unit_request_info_impl_t, link)
5741 {
5742 req = &req_impl->req;
5822 if (quit_param == NXT_QUIT_GRACEFUL) {
5823 pthread_mutex_lock(&ctx_impl->mutex);
5743
5824
5744 nxt_unit_req_warn(req, "active request on ctx quit");
5825 quit = nxt_queue_is_empty(&ctx_impl->active_req)
5826 && nxt_queue_is_empty(&ctx_impl->pending_rbuf)
5827 && ctx_impl->wait_items == 0;
5745
5828
5746 if (cb->close_handler) {
5747 nxt_unit_req_debug(req, "close_handler");
5829 pthread_mutex_unlock(&ctx_impl->mutex);
5748
5830
5749 cb->close_handler(req);
5831 } else {
5832 quit = 1;
5833 ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
5834 }
5750
5835
5751 } else {
5752 nxt_unit_request_done(req, NXT_UNIT_ERROR);
5836 if (quit) {
5837 ctx_impl->online = 0;
5838
5839 if (cb->quit != NULL) {
5840 cb->quit(ctx);
5753 }
5754
5841 }
5842
5755 } nxt_queue_loop;
5843 nxt_queue_each(req_impl, &ctx_impl->active_req,
5844 nxt_unit_request_info_impl_t, link)
5845 {
5846 req = &req_impl->req;
5756
5847
5757 if (ctx != &lib->main_ctx.ctx) {
5848 nxt_unit_req_warn(req, "active request on ctx quit");
5849
5850 if (cb->close_handler) {
5851 nxt_unit_req_debug(req, "close_handler");
5852
5853 cb->close_handler(req);
5854
5855 } else {
5856 nxt_unit_request_done(req, NXT_UNIT_ERROR);
5857 }
5858
5859 } nxt_queue_loop;
5860
5861 if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5862 nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id);
5863 }
5864 }
5865
5866 if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) {
5758 return;
5759 }
5760
5867 return;
5868 }
5869
5761 memset(&msg, 0, sizeof(nxt_port_msg_t));
5870 memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5762
5871
5763 msg.pid = lib->pid;
5764 msg.type = _NXT_PORT_MSG_QUIT;
5872 m.msg.pid = lib->pid;
5873 m.msg.type = _NXT_PORT_MSG_QUIT;
5874 m.quit_param = quit_param;
5765
5766 pthread_mutex_lock(&lib->mutex);
5767
5768 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
5769
5770 if (ctx == &ctx_impl->ctx
5771 || ctx_impl->read_port == NULL
5772 || ctx_impl->read_port->out_fd == -1)
5773 {
5774 continue;
5775 }
5776
5777 (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
5875
5876 pthread_mutex_lock(&lib->mutex);
5877
5878 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
5879
5880 if (ctx == &ctx_impl->ctx
5881 || ctx_impl->read_port == NULL
5882 || ctx_impl->read_port->out_fd == -1)
5883 {
5884 continue;
5885 }
5886
5887 (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
5778 &msg, sizeof(msg), NULL, 0);
5888 &m, sizeof(m), NULL, 0);
5779
5780 } nxt_queue_loop;
5781
5782 pthread_mutex_unlock(&lib->mutex);
5783}
5784
5785
5786static int

--- 297 unchanged lines hidden (view full) ---

6084{
6085 int res;
6086 nxt_unit_port_impl_t *port_impl;
6087
6088 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6089
6090retry:
6091
5889
5890 } nxt_queue_loop;
5891
5892 pthread_mutex_unlock(&lib->mutex);
5893}
5894
5895
5896static int

--- 297 unchanged lines hidden (view full) ---

6194{
6195 int res;
6196 nxt_unit_port_impl_t *port_impl;
6197
6198 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6199
6200retry:
6201
6092 res = nxt_unit_app_queue_recv(port, rbuf);
6202 res = nxt_unit_app_queue_recv(ctx, port, rbuf);
6093
6203
6204 if (res == NXT_UNIT_OK) {
6205 return NXT_UNIT_OK;
6206 }
6207
6094 if (res == NXT_UNIT_AGAIN) {
6095 res = nxt_unit_port_recv(ctx, port, rbuf);
6096 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6097 return NXT_UNIT_ERROR;
6098 }
6099
6100 if (nxt_unit_is_read_queue(rbuf)) {
6101 nxt_app_queue_notification_received(port_impl->queue);

--- 87 unchanged lines hidden (view full) ---

6189
6190 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
6191
6192 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6193}
6194
6195
6196static int
6208 if (res == NXT_UNIT_AGAIN) {
6209 res = nxt_unit_port_recv(ctx, port, rbuf);
6210 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6211 return NXT_UNIT_ERROR;
6212 }
6213
6214 if (nxt_unit_is_read_queue(rbuf)) {
6215 nxt_app_queue_notification_received(port_impl->queue);

--- 87 unchanged lines hidden (view full) ---

6303
6304 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
6305
6306 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6307}
6308
6309
6310static int
6197nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6311nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6312 nxt_unit_read_buf_t *rbuf)
6198{
6199 uint32_t cookie;
6200 nxt_port_msg_t *port_msg;
6201 nxt_app_queue_t *queue;
6313{
6314 uint32_t cookie;
6315 nxt_port_msg_t *port_msg;
6316 nxt_app_queue_t *queue;
6317 nxt_unit_impl_t *lib;
6202 nxt_unit_port_impl_t *port_impl;
6203
6318 nxt_unit_port_impl_t *port_impl;
6319
6320 struct {
6321 nxt_port_msg_t msg;
6322 uint8_t quit_param;
6323 } nxt_packed m;
6324
6204 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6205 queue = port_impl->queue;
6206
6207retry:
6208
6209 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
6210
6211 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
6212
6213 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
6214 port_msg = (nxt_port_msg_t *) rbuf->buf;
6215
6216 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
6325 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6326 queue = port_impl->queue;
6327
6328retry:
6329
6330 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
6331
6332 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
6333
6334 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
6335 port_msg = (nxt_port_msg_t *) rbuf->buf;
6336
6337 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
6338 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6339
6340 if (lib->request_limit != 0) {
6341 nxt_atomic_fetch_add(&lib->request_count, 1);
6342
6343 if (nxt_slow_path(lib->request_count >= lib->request_limit)) {
6344 nxt_unit_debug(ctx, "request limit reached");
6345
6346 memset(&m.msg, 0, sizeof(nxt_port_msg_t));
6347
6348 m.msg.pid = lib->pid;
6349 m.msg.type = _NXT_PORT_MSG_QUIT;
6350 m.quit_param = NXT_QUIT_GRACEFUL;
6351
6352 (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port,
6353 &m, sizeof(m), NULL, 0);
6354 }
6355 }
6356
6217 return NXT_UNIT_OK;
6218 }
6219
6220 nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
6221
6222 goto retry;
6223 }
6224

--- 481 unchanged lines hidden ---
6357 return NXT_UNIT_OK;
6358 }
6359
6360 nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
6361
6362 goto retry;
6363 }
6364

--- 481 unchanged lines hidden ---