Deleted Added
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"

--- 11 unchanged lines hidden (view full) ---

20#if (NXT_HAVE_MEMFD_CREATE)
21#include <linux/memfd.h>
22#endif
23
24#define NXT_UNIT_MAX_PLAIN_SIZE 1024
25#define NXT_UNIT_LOCAL_BUF_SIZE \
26 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
27
28typedef struct nxt_unit_impl_s nxt_unit_impl_t;
29typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
30typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
31typedef struct nxt_unit_process_s nxt_unit_process_t;
32typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
33typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
34typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
35typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;

--- 10 unchanged lines hidden (view full) ---

46nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
47nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
48 nxt_unit_mmap_buf_t *mmap_buf);
49nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
50 nxt_unit_mmap_buf_t *mmap_buf);
51nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
52static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
53 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
54 int *log_fd, uint32_t *stream, uint32_t *shm_limit);
55static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
56 int queue_fd);
57static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
58 nxt_unit_request_info_t **preq);
59static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
60 nxt_unit_recv_msg_t *recv_msg);
61static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
62static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,

--- 62 unchanged lines hidden (view full) ---

125static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
126
127static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
128static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
129 pid_t pid, int remove);
130static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
131static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
132static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
133static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
134static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
135nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
136nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
137nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
138nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
139static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
140 nxt_unit_port_t *port);

--- 4 unchanged lines hidden (view full) ---

145 nxt_unit_port_t *port, int queue_fd);
146
147nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
148nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
149static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
150 nxt_unit_port_t *port, void *queue);
151static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
152 nxt_queue_t *awaiting_req);
153static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
154 nxt_unit_port_id_t *port_id);
155static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
156 nxt_unit_port_id_t *port_id);
157static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
158static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
159 nxt_unit_process_t *process);
160static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
161static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
162static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
163 nxt_unit_port_t *port, const void *buf, size_t buf_size,
164 const void *oob, size_t oob_size);
165static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
166 const void *buf, size_t buf_size, const void *oob, size_t oob_size);
167static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
168 nxt_unit_read_buf_t *rbuf);
169nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
170 nxt_unit_read_buf_t *src);
171static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
172 nxt_unit_read_buf_t *rbuf);
173static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
174 nxt_unit_read_buf_t *rbuf);
175static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
176 nxt_unit_read_buf_t *rbuf);
177static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
178 nxt_unit_read_buf_t *rbuf);
179nxt_inline int nxt_unit_close(int fd);
180static int nxt_unit_fd_blocking(int fd);
181
182static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
183 nxt_unit_port_t *port);
184static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
185 nxt_unit_port_id_t *port_id, int remove);

--- 120 unchanged lines hidden (view full) ---

306 nxt_queue_t ready_req;
307
308 /* of nxt_unit_read_buf_t */
309 nxt_queue_t pending_rbuf;
310
311 /* of nxt_unit_read_buf_t */
312 nxt_queue_t free_rbuf;
313
314 int online;
315 int ready;
316
317 nxt_unit_mmap_buf_t ctx_buf[2];
318 nxt_unit_read_buf_t ctx_read_buf;
319
320 nxt_unit_request_info_impl_t req;
321};
322
323

--- 15 unchanged lines hidden (view full) ---

339};
340
341
342struct nxt_unit_impl_s {
343 nxt_unit_t unit;
344 nxt_unit_callbacks_t callbacks;
345
346 nxt_atomic_t use_count;
347
348 uint32_t request_data_size;
349 uint32_t shm_mmap_limit;
350
351 pthread_mutex_t mutex;
352
353 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
354 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
355
356 nxt_unit_port_t *router_port;
357 nxt_unit_port_t *shared_port;

--- 51 unchanged lines hidden (view full) ---

409} nxt_unit_port_hash_id_t;
410
411
412nxt_unit_ctx_t *
413nxt_unit_init(nxt_unit_init_t *init)
414{
415 int rc, queue_fd;
416 void *mem;
417 uint32_t ready_stream, shm_limit;
418 nxt_unit_ctx_t *ctx;
419 nxt_unit_impl_t *lib;
420 nxt_unit_port_t ready_port, router_port, read_port;
421
422 lib = nxt_unit_create(init);
423 if (nxt_slow_path(lib == NULL)) {
424 return NULL;
425 }

--- 15 unchanged lines hidden (view full) ---

441 ready_port.id.id);
442 nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
443 router_port.id.id);
444 nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
445 read_port.id.id);
446
447 } else {
448 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
449 &lib->log_fd, &ready_stream, &shm_limit);
450 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
451 goto fail;
452 }
453
454 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
455 / PORT_MMAP_DATA_SIZE;
456 }
457
458 if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
459 lib->shm_mmap_limit = 1;
460 }
461
462 lib->pid = read_port.id.pid;
463

--- 95 unchanged lines hidden (view full) ---

559 }
560
561 lib->unit.data = init->data;
562 lib->callbacks = init->callbacks;
563
564 lib->request_data_size = init->request_data_size;
565 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
566 / PORT_MMAP_DATA_SIZE;
567
568 lib->processes.slot = NULL;
569 lib->ports.slot = NULL;
570
571 lib->log_fd = STDERR_FILENO;
572
573 nxt_queue_init(&lib->contexts);
574
575 lib->use_count = 0;
576 lib->router_port = NULL;
577 lib->shared_port = NULL;
578
579 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
580 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
581 pthread_mutex_destroy(&lib->mutex);
582 goto fail;
583 }

--- 43 unchanged lines hidden (view full) ---

627 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
628
629 pthread_mutex_unlock(&lib->mutex);
630
631 ctx_impl->use_count = 1;
632 ctx_impl->wait_items = 0;
633 ctx_impl->online = 1;
634 ctx_impl->ready = 0;
635
636 nxt_queue_init(&ctx_impl->free_req);
637 nxt_queue_init(&ctx_impl->free_ws);
638 nxt_queue_init(&ctx_impl->active_req);
639 nxt_queue_init(&ctx_impl->ready_req);
640 nxt_queue_init(&ctx_impl->pending_rbuf);
641 nxt_queue_init(&ctx_impl->free_rbuf);
642

--- 132 unchanged lines hidden (view full) ---

775 *prev = mmap_buf->next;
776 }
777}
778
779
780static int
781nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
782 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
783 uint32_t *shm_limit)
784{
785 int rc;
786 int ready_fd, router_fd, read_in_fd, read_out_fd;
787 char *unit_init, *version_end, *vars;
788 size_t version_length;
789 int64_t ready_pid, router_pid, read_pid;
790 uint32_t ready_stream, router_id, ready_id, read_id;
791

--- 28 unchanged lines hidden (view full) ---

820
821 vars = version_end + 1;
822
823 rc = sscanf(vars,
824 "%"PRIu32";"
825 "%"PRId64",%"PRIu32",%d;"
826 "%"PRId64",%"PRIu32",%d;"
827 "%"PRId64",%"PRIu32",%d,%d;"
828 "%d,%"PRIu32,
829 &ready_stream,
830 &ready_pid, &ready_id, &ready_fd,
831 &router_pid, &router_id, &router_fd,
832 &read_pid, &read_id, &read_in_fd, &read_out_fd,
833 log_fd, shm_limit);
834
835 if (nxt_slow_path(rc == EOF)) {
836 nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
837 vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
838
839 return NXT_UNIT_ERROR;
840 }
841
842 if (nxt_slow_path(rc != 13)) {
843 nxt_unit_alert(NULL, "invalid number of variables in %s env: "
844 "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars);
845
846 return NXT_UNIT_ERROR;
847 }
848
849 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
850
851 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
852

--- 71 unchanged lines hidden (view full) ---

924
925
926static int
927nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
928 nxt_unit_request_info_t **preq)
929{
930 int rc;
931 pid_t pid;
932 struct cmsghdr *cm;
933 nxt_port_msg_t *port_msg;
934 nxt_unit_impl_t *lib;
935 nxt_unit_recv_msg_t recv_msg;
936
937 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
938
939 recv_msg.fd[0] = -1;

--- 14 unchanged lines hidden (view full) ---

954 }
955
956 recv_msg.incoming_buf = NULL;
957
958 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
959 if (nxt_slow_path(rbuf->size == 0)) {
960 nxt_unit_debug(ctx, "read port closed");
961
962 nxt_unit_quit(ctx);
963 rc = NXT_UNIT_OK;
964 goto done;
965 }
966
967 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
968
969 rc = NXT_UNIT_ERROR;
970 goto done;

--- 42 unchanged lines hidden (view full) ---

1013
1014 switch (port_msg->type) {
1015
1016 case _NXT_PORT_MSG_RPC_READY:
1017 rc = NXT_UNIT_OK;
1018 break;
1019
1020 case _NXT_PORT_MSG_QUIT:
1021 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
1022
1023 nxt_unit_quit(ctx);
1024 rc = NXT_UNIT_OK;
1025 break;
1026
1027 case _NXT_PORT_MSG_NEW_PORT:
1028 rc = nxt_unit_process_new_port(ctx, &recv_msg);
1029 break;
1030
1031 case _NXT_PORT_MSG_PORT_ACK:

--- 183 unchanged lines hidden (view full) ---

1215
1216
1217static int
1218nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1219{
1220 nxt_unit_impl_t *lib;
1221 nxt_unit_ctx_impl_t *ctx_impl;
1222
1223 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1224 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1225
1226 ctx_impl->ready = 1;
1227
1228 if (lib->callbacks.ready_handler) {
1229 return lib->callbacks.ready_handler(ctx);
1230 }
1231
1232 return NXT_UNIT_OK;
1233}
1234
1235
1236static int
1237nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1238 nxt_unit_request_info_t **preq)
1239{

--- 496 unchanged lines hidden (view full) ---

1736
1737 return req_impl;
1738}
1739
1740
1741static void
1742nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1743{
1744 nxt_unit_ctx_impl_t *ctx_impl;
1745 nxt_unit_request_info_impl_t *req_impl;
1746
1747 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1748 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1749
1750 req->response = NULL;
1751 req->response_buf = NULL;
1752
1753 if (req_impl->in_hash) {
1754 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1755 }

--- 22 unchanged lines hidden (view full) ---

1778
1779 pthread_mutex_lock(&ctx_impl->mutex);
1780
1781 nxt_queue_remove(&req_impl->link);
1782
1783 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1784
1785 pthread_mutex_unlock(&ctx_impl->mutex);
1786}
1787
1788
1789static void
1790nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1791{
1792 nxt_unit_ctx_impl_t *ctx_impl;
1793

--- 2723 unchanged lines hidden (view full) ---

4517 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4518
4519 rc = NXT_UNIT_OK;
4520
4521 while (nxt_fast_path(ctx_impl->online)) {
4522 rc = nxt_unit_run_once_impl(ctx);
4523
4524 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4525 nxt_unit_quit(ctx);
4526 break;
4527 }
4528 }
4529
4530 nxt_unit_ctx_release(ctx);
4531
4532 return rc;
4533}

--- 47 unchanged lines hidden (view full) ---

4581 return rc;
4582}
4583
4584
4585static int
4586nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4587{
4588 int nevents, res, err;
4589 nxt_unit_impl_t *lib;
4590 nxt_unit_ctx_impl_t *ctx_impl;
4591 nxt_unit_port_impl_t *port_impl;
4592 struct pollfd fds[2];
4593
4594 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4595
4596 if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) {
4597 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4598 }
4599
4600 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4601 port);
4602
4603 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4604

--- 16 unchanged lines hidden (view full) ---

4621 (int) ctx_impl->read_port->id.id,
4622 (int) rbuf->size);
4623
4624 return NXT_UNIT_OK;
4625 }
4626 }
4627 }
4628
4629 res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
4630 if (res == NXT_UNIT_OK) {
4631 return NXT_UNIT_OK;
4632 }
4633
4634 fds[0].fd = ctx_impl->read_port->in_fd;
4635 fds[0].events = POLLIN;
4636 fds[0].revents = 0;
4637
4638 fds[1].fd = lib->shared_port->in_fd;
4639 fds[1].events = POLLIN;
4640 fds[1].revents = 0;
4641
4642 nevents = poll(fds, 2, -1);
4643 if (nxt_slow_path(nevents == -1)) {
4644 err = errno;
4645
4646 if (err == EINTR) {
4647 goto retry;
4648 }
4649
4650 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",

--- 30 unchanged lines hidden (view full) ---

4681 fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4682 fds[1].revents);
4683
4684 return NXT_UNIT_ERROR;
4685}
4686
4687
4688static int
4689nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4690{
4691 int rc;
4692 nxt_queue_t pending_rbuf;
4693 nxt_unit_ctx_impl_t *ctx_impl;
4694 nxt_unit_read_buf_t *rbuf;
4695
4696 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

--- 21 unchanged lines hidden (view full) ---

4718 rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
4719
4720 } else {
4721 nxt_unit_read_buf_release(ctx, rbuf);
4722 }
4723
4724 } nxt_queue_loop;
4725
4726 return rc;
4727}
4728
4729
4730static void
4731nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4732{
4733 int res;

--- 164 unchanged lines hidden (view full) ---

4898
4899
4900int
4901nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4902{
4903 int rc;
4904 nxt_unit_impl_t *lib;
4905 nxt_unit_read_buf_t *rbuf;
4906 nxt_unit_ctx_impl_t *ctx_impl;
4907
4908 nxt_unit_ctx_use(ctx);
4909
4910 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4911 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4912
4913 rc = NXT_UNIT_OK;
4914
4915 while (nxt_fast_path(ctx_impl->online)) {
4916 rbuf = nxt_unit_read_buf_get(ctx);
4917 if (nxt_slow_path(rbuf == NULL)) {
4918 rc = NXT_UNIT_ERROR;
4919 break;
4920 }
4921
4922 retry:
4923

--- 20 unchanged lines hidden (view full) ---

4944
4945
4946nxt_unit_request_info_t *
4947nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
4948{
4949 int rc;
4950 nxt_unit_impl_t *lib;
4951 nxt_unit_read_buf_t *rbuf;
4952 nxt_unit_ctx_impl_t *ctx_impl;
4953 nxt_unit_request_info_t *req;
4954
4955 nxt_unit_ctx_use(ctx);
4956
4957 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4958 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4959
4960 req = NULL;
4961
4962 if (nxt_slow_path(!ctx_impl->online)) {
4963 goto done;
4964 }
4965
4966 rbuf = nxt_unit_read_buf_get(ctx);
4967 if (nxt_slow_path(rbuf == NULL)) {
4968 goto done;
4969 }
4970
4971 rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
4972 if (rc != NXT_UNIT_OK) {
4973 nxt_unit_read_buf_release(ctx, rbuf);
4974 goto done;
4975 }
4976
4977 (void) nxt_unit_process_msg(ctx, rbuf, &req);
4978
4979done:
4980
4981 nxt_unit_ctx_release(ctx);
4982
4983 return req;
4984}
4985
4986
4987int
4988nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
4989{
4990 nxt_unit_impl_t *lib;
4991
4992 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4993
4994 return (ctx == &lib->main_ctx.ctx);
4995}
4996
4997
4998int
4999nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5000{
5001 int rc;
5002
5003 nxt_unit_ctx_use(ctx);
5004
5005 rc = nxt_unit_process_port_msg_impl(ctx, port);
5006

--- 5 unchanged lines hidden (view full) ---

5012
5013static int
5014nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5015{
5016 int rc;
5017 nxt_unit_impl_t *lib;
5018 nxt_unit_read_buf_t *rbuf;
5019
5020 rbuf = nxt_unit_read_buf_get(ctx);
5021 if (nxt_slow_path(rbuf == NULL)) {
5022 return NXT_UNIT_ERROR;
5023 }
5024
5025 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5026
5027 if (port == lib->shared_port) {
5028 rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
5029
5030 } else {
5031 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
5032 }
5033
5034 if (rc != NXT_UNIT_OK) {

--- 154 unchanged lines hidden (view full) ---

5189
5190 pthread_mutex_lock(&lib->mutex);
5191
5192 nxt_queue_remove(&ctx_impl->link);
5193
5194 pthread_mutex_unlock(&lib->mutex);
5195
5196 if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5197 nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
5198 nxt_unit_port_release(ctx_impl->read_port);
5199 }
5200
5201 if (ctx_impl != &lib->main_ctx) {
5202 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
5203 }
5204
5205 nxt_unit_lib_release(lib);

--- 394 unchanged lines hidden (view full) ---

5600
5601 nxt_unit_awake_ctx(ctx, ctx_impl);
5602
5603 } nxt_queue_loop;
5604}
5605
5606
5607static void
5608nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5609{
5610 nxt_unit_port_t *port;
5611 nxt_unit_port_impl_t *port_impl;
5612
5613 pthread_mutex_lock(&lib->mutex);
5614
5615 port = nxt_unit_remove_port_unsafe(lib, port_id);
5616
5617 if (nxt_fast_path(port != NULL)) {
5618 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5619
5620 nxt_queue_remove(&port_impl->link);
5621 }
5622
5623 pthread_mutex_unlock(&lib->mutex);
5624
5625 if (lib->callbacks.remove_port != NULL && port != NULL) {
5626 lib->callbacks.remove_port(&lib->unit, port);
5627 }
5628
5629 if (nxt_fast_path(port != NULL)) {
5630 nxt_unit_port_release(port);
5631 }
5632}
5633
5634

--- 60 unchanged lines hidden (view full) ---

5695
5696 pthread_mutex_unlock(&lib->mutex);
5697
5698 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5699
5700 nxt_queue_remove(&port->link);
5701
5702 if (lib->callbacks.remove_port != NULL) {
5703 lib->callbacks.remove_port(&lib->unit, &port->port);
5704 }
5705
5706 nxt_unit_port_release(&port->port);
5707
5708 } nxt_queue_loop;
5709
5710 nxt_unit_process_release(process);
5711}
5712
5713
5714static void
5715nxt_unit_quit(nxt_unit_ctx_t *ctx)
5716{
5717 nxt_port_msg_t msg;
5718 nxt_unit_impl_t *lib;
5719 nxt_unit_ctx_impl_t *ctx_impl;
5720 nxt_unit_callbacks_t *cb;
5721 nxt_unit_request_info_t *req;
5722 nxt_unit_request_info_impl_t *req_impl;
5723
5724 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5725 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5726
5727 if (!ctx_impl->online) {
5728 return;
5729 }
5730
5731 ctx_impl->online = 0;
5732
5733 cb = &lib->callbacks;
5734
5735 if (cb->quit != NULL) {
5736 cb->quit(ctx);
5737 }
5738
5739 nxt_queue_each(req_impl, &ctx_impl->active_req,
5740 nxt_unit_request_info_impl_t, link)
5741 {
5742 req = &req_impl->req;
5743
5744 nxt_unit_req_warn(req, "active request on ctx quit");
5745
5746 if (cb->close_handler) {
5747 nxt_unit_req_debug(req, "close_handler");
5748
5749 cb->close_handler(req);
5750
5751 } else {
5752 nxt_unit_request_done(req, NXT_UNIT_ERROR);
5753 }
5754
5755 } nxt_queue_loop;
5756
5757 if (ctx != &lib->main_ctx.ctx) {
5758 return;
5759 }
5760
5761 memset(&msg, 0, sizeof(nxt_port_msg_t));
5762
5763 msg.pid = lib->pid;
5764 msg.type = _NXT_PORT_MSG_QUIT;
5765
5766 pthread_mutex_lock(&lib->mutex);
5767
5768 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
5769
5770 if (ctx == &ctx_impl->ctx
5771 || ctx_impl->read_port == NULL
5772 || ctx_impl->read_port->out_fd == -1)
5773 {
5774 continue;
5775 }
5776
5777 (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
5778 &msg, sizeof(msg), NULL, 0);
5779
5780 } nxt_queue_loop;
5781
5782 pthread_mutex_unlock(&lib->mutex);
5783}
5784
5785
5786static int

--- 297 unchanged lines hidden (view full) ---

6084{
6085 int res;
6086 nxt_unit_port_impl_t *port_impl;
6087
6088 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6089
6090retry:
6091
6092 res = nxt_unit_app_queue_recv(port, rbuf);
6093
6094 if (res == NXT_UNIT_AGAIN) {
6095 res = nxt_unit_port_recv(ctx, port, rbuf);
6096 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6097 return NXT_UNIT_ERROR;
6098 }
6099
6100 if (nxt_unit_is_read_queue(rbuf)) {
6101 nxt_app_queue_notification_received(port_impl->queue);

--- 87 unchanged lines hidden (view full) ---

6189
6190 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
6191
6192 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6193}
6194
6195
6196static int
6197nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6198{
6199 uint32_t cookie;
6200 nxt_port_msg_t *port_msg;
6201 nxt_app_queue_t *queue;
6202 nxt_unit_port_impl_t *port_impl;
6203
6204 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6205 queue = port_impl->queue;
6206
6207retry:
6208
6209 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
6210
6211 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
6212
6213 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
6214 port_msg = (nxt_port_msg_t *) rbuf->buf;
6215
6216 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
6217 return NXT_UNIT_OK;
6218 }
6219
6220 nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
6221
6222 goto retry;
6223 }
6224

--- 481 unchanged lines hidden ---