nxt_unit.c (1545:78836321a126) nxt_unit.c (1546:06017e6e3a5f)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"

--- 37 unchanged lines hidden (view full) ---

46 nxt_unit_mmap_buf_t *mmap_buf);
47nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
48 nxt_unit_mmap_buf_t *mmap_buf);
49nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
50static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
51 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
52 int *log_fd, uint32_t *stream, uint32_t *shm_limit);
53static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"

--- 37 unchanged lines hidden (view full) ---

46 nxt_unit_mmap_buf_t *mmap_buf);
47nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
48 nxt_unit_mmap_buf_t *mmap_buf);
49nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
50static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
51 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
52 int *log_fd, uint32_t *stream, uint32_t *shm_limit);
53static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
54static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
54static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
55 nxt_unit_recv_msg_t *recv_msg);
56static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
57 nxt_unit_recv_msg_t *recv_msg);
58static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
59 nxt_unit_port_id_t *port_id);
60static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
61 nxt_unit_recv_msg_t *recv_msg);

--- 36 unchanged lines hidden (view full) ---

98 nxt_unit_port_t *port, uint32_t size,
99 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
100static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
101
102static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
103nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
104nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
105static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
55static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
56 nxt_unit_recv_msg_t *recv_msg);
57static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
58 nxt_unit_recv_msg_t *recv_msg);
59static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
60 nxt_unit_port_id_t *port_id);
61static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
62 nxt_unit_recv_msg_t *recv_msg);

--- 36 unchanged lines hidden (view full) ---

99 nxt_unit_port_t *port, uint32_t size,
100 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
101static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
102
103static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
104nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
105nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
106static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
106static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
107 nxt_unit_process_t *process, uint32_t id);
108static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
107static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
109 nxt_unit_recv_msg_t *recv_msg);
108 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
109static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
110 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
111 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
110static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
112static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
111 nxt_unit_recv_msg_t *recv_msg);
113 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
114static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
112static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
113 nxt_unit_process_t *process,
114 nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
115static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
116
117static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
118 pid_t pid);
119static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,

--- 115 unchanged lines hidden (view full) ---

235
236 nxt_queue_link_t link;
237
238 nxt_unit_ctx_impl_t *ctx_impl;
239};
240
241
242struct nxt_unit_read_buf_s {
115static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
116 nxt_unit_process_t *process,
117 nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
118static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
119
120static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
121 pid_t pid);
122static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,

--- 115 unchanged lines hidden (view full) ---

238
239 nxt_queue_link_t link;
240
241 nxt_unit_ctx_impl_t *ctx_impl;
242};
243
244
245struct nxt_unit_read_buf_s {
243 nxt_unit_read_buf_t *next;
246 nxt_queue_link_t link;
247 nxt_unit_ctx_impl_t *ctx_impl;
244 ssize_t size;
245 char buf[16384];
246 char oob[256];
247};
248
249
250struct nxt_unit_ctx_impl_s {
251 nxt_unit_ctx_t ctx;

--- 19 unchanged lines hidden (view full) ---

271 nxt_queue_t active_req;
272
273 /* of nxt_unit_request_info_impl_t */
274 nxt_lvlhsh_t requests;
275
276 /* of nxt_unit_request_info_impl_t */
277 nxt_queue_t ready_req;
278
248 ssize_t size;
249 char buf[16384];
250 char oob[256];
251};
252
253
254struct nxt_unit_ctx_impl_s {
255 nxt_unit_ctx_t ctx;

--- 19 unchanged lines hidden (view full) ---

275 nxt_queue_t active_req;
276
277 /* of nxt_unit_request_info_impl_t */
278 nxt_lvlhsh_t requests;
279
280 /* of nxt_unit_request_info_impl_t */
281 nxt_queue_t ready_req;
282
279 nxt_unit_read_buf_t *pending_read_head;
280 nxt_unit_read_buf_t **pending_read_tail;
281 nxt_unit_read_buf_t *free_read_buf;
283 /* of nxt_unit_read_buf_t */
284 nxt_queue_t pending_rbuf;
282
285
286 /* of nxt_unit_read_buf_t */
287 nxt_queue_t free_rbuf;
288
283 nxt_unit_mmap_buf_t ctx_buf[2];
284 nxt_unit_read_buf_t ctx_read_buf;
285
286 nxt_unit_request_info_impl_t req;
287};
288
289
290struct nxt_unit_impl_s {

--- 22 unchanged lines hidden (view full) ---

313};
314
315
316struct nxt_unit_port_impl_s {
317 nxt_unit_port_t port;
318
319 nxt_atomic_t use_count;
320
289 nxt_unit_mmap_buf_t ctx_buf[2];
290 nxt_unit_read_buf_t ctx_read_buf;
291
292 nxt_unit_request_info_impl_t req;
293};
294
295
296struct nxt_unit_impl_s {

--- 22 unchanged lines hidden (view full) ---

319};
320
321
322struct nxt_unit_port_impl_s {
323 nxt_unit_port_t port;
324
325 nxt_atomic_t use_count;
326
327 /* for nxt_unit_process_t.ports */
321 nxt_queue_link_t link;
322 nxt_unit_process_t *process;
323
324 /* of nxt_unit_request_info_impl_t */
325 nxt_queue_t awaiting_req;
326
327 int ready;
328};
329
330
331struct nxt_unit_mmap_s {
332 nxt_port_mmap_header_t *hdr;
328 nxt_queue_link_t link;
329 nxt_unit_process_t *process;
330
331 /* of nxt_unit_request_info_impl_t */
332 nxt_queue_t awaiting_req;
333
334 int ready;
335};
336
337
338struct nxt_unit_mmap_s {
339 nxt_port_mmap_header_t *hdr;
340
341 /* of nxt_unit_read_buf_t */
342 nxt_queue_t awaiting_rbuf;
333};
334
335
336struct nxt_unit_mmaps_s {
337 pthread_mutex_t mutex;
338 uint32_t size;
339 uint32_t cap;
340 nxt_atomic_t allocated_chunks;
341 nxt_unit_mmap_t *elts;
342};
343
344
345struct nxt_unit_process_s {
346 pid_t pid;
347
343};
344
345
346struct nxt_unit_mmaps_s {
347 pthread_mutex_t mutex;
348 uint32_t size;
349 uint32_t cap;
350 nxt_atomic_t allocated_chunks;
351 nxt_unit_mmap_t *elts;
352};
353
354
355struct nxt_unit_process_s {
356 pid_t pid;
357
348 nxt_queue_t ports;
358 nxt_queue_t ports; /* of nxt_unit_port_impl_t */
349
350 nxt_unit_mmaps_t incoming;
351 nxt_unit_mmaps_t outgoing;
352
353 nxt_unit_impl_t *lib;
354
355 nxt_atomic_t use_count;
356

--- 175 unchanged lines hidden (view full) ---

532
533 ctx_impl->use_count = 1;
534 ctx_impl->wait_items = 0;
535
536 nxt_queue_init(&ctx_impl->free_req);
537 nxt_queue_init(&ctx_impl->free_ws);
538 nxt_queue_init(&ctx_impl->active_req);
539 nxt_queue_init(&ctx_impl->ready_req);
359
360 nxt_unit_mmaps_t incoming;
361 nxt_unit_mmaps_t outgoing;
362
363 nxt_unit_impl_t *lib;
364
365 nxt_atomic_t use_count;
366

--- 175 unchanged lines hidden (view full) ---

542
543 ctx_impl->use_count = 1;
544 ctx_impl->wait_items = 0;
545
546 nxt_queue_init(&ctx_impl->free_req);
547 nxt_queue_init(&ctx_impl->free_ws);
548 nxt_queue_init(&ctx_impl->active_req);
549 nxt_queue_init(&ctx_impl->ready_req);
550 nxt_queue_init(&ctx_impl->pending_rbuf);
551 nxt_queue_init(&ctx_impl->free_rbuf);
540
541 ctx_impl->free_buf = NULL;
542 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
543 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
544
545 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
552
553 ctx_impl->free_buf = NULL;
554 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
555 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
556
557 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
558 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
546
559
547 ctx_impl->pending_read_head = NULL;
548 ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
549 ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
550 ctx_impl->ctx_read_buf.next = NULL;
560 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
551
552 ctx_impl->req.req.ctx = &ctx_impl->ctx;
553 ctx_impl->req.req.unit = &lib->unit;
554
555 ctx_impl->read_port = NULL;
556 ctx_impl->requests.slot = 0;
557
558 return NXT_UNIT_OK;

--- 203 unchanged lines hidden (view full) ---

762 if (res != sizeof(msg)) {
763 return NXT_UNIT_ERROR;
764 }
765
766 return NXT_UNIT_OK;
767}
768
769
561
562 ctx_impl->req.req.ctx = &ctx_impl->ctx;
563 ctx_impl->req.req.unit = &lib->unit;
564
565 ctx_impl->read_port = NULL;
566 ctx_impl->requests.slot = 0;
567
568 return NXT_UNIT_OK;

--- 203 unchanged lines hidden (view full) ---

772 if (res != sizeof(msg)) {
773 return NXT_UNIT_ERROR;
774 }
775
776 return NXT_UNIT_OK;
777}
778
779
770int
771nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
772 void *buf, size_t buf_size, void *oob, size_t oob_size)
780static int
781nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
773{
774 int rc;
775 pid_t pid;
776 struct cmsghdr *cm;
777 nxt_port_msg_t *port_msg;
778 nxt_unit_impl_t *lib;
779 nxt_unit_recv_msg_t recv_msg;
780
781 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
782
783 rc = NXT_UNIT_ERROR;
784 recv_msg.fd = -1;
785 recv_msg.process = NULL;
782{
783 int rc;
784 pid_t pid;
785 struct cmsghdr *cm;
786 nxt_port_msg_t *port_msg;
787 nxt_unit_impl_t *lib;
788 nxt_unit_recv_msg_t recv_msg;
789
790 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
791
792 rc = NXT_UNIT_ERROR;
793 recv_msg.fd = -1;
794 recv_msg.process = NULL;
786 port_msg = buf;
787 cm = oob;
795 port_msg = (nxt_port_msg_t *) rbuf->buf;
796 cm = (struct cmsghdr *) rbuf->oob;
788
797
789 if (oob_size >= CMSG_SPACE(sizeof(int))
790 && cm->cmsg_len == CMSG_LEN(sizeof(int))
798 if (cm->cmsg_len == CMSG_LEN(sizeof(int))
791 && cm->cmsg_level == SOL_SOCKET
792 && cm->cmsg_type == SCM_RIGHTS)
793 {
794 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
795 }
796
797 recv_msg.incoming_buf = NULL;
798
799 && cm->cmsg_level == SOL_SOCKET
800 && cm->cmsg_type == SCM_RIGHTS)
801 {
802 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
803 }
804
805 recv_msg.incoming_buf = NULL;
806
799 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
800 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
807 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
808 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
801 goto fail;
802 }
803
804 recv_msg.stream = port_msg->stream;
805 recv_msg.pid = port_msg->pid;
806 recv_msg.reply_port = port_msg->reply_port;
807 recv_msg.last = port_msg->last;
808 recv_msg.mmap = port_msg->mmap;
809
810 recv_msg.start = port_msg + 1;
809 goto fail;
810 }
811
812 recv_msg.stream = port_msg->stream;
813 recv_msg.pid = port_msg->pid;
814 recv_msg.reply_port = port_msg->reply_port;
815 recv_msg.last = port_msg->last;
816 recv_msg.mmap = port_msg->mmap;
817
818 recv_msg.start = port_msg + 1;
811 recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
819 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
812
813 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
814 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
815 port_msg->stream, (int) port_msg->type);
816 goto fail;
817 }
818
820
821 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
822 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
823 port_msg->stream, (int) port_msg->type);
824 goto fail;
825 }
826
819 if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
820 rc = NXT_UNIT_OK;
827 if (port_msg->tracking) {
828 rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf);
821
829
822 goto fail;
830 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
831 if (rc == NXT_UNIT_AGAIN) {
832 recv_msg.fd = -1;
833 }
834
835 goto fail;
836 }
823 }
824
825 /* Fragmentation is unsupported. */
826 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
827 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
828 port_msg->stream, (int) port_msg->type);
829 goto fail;
830 }
831
832 if (port_msg->mmap) {
837 }
838
839 /* Fragmentation is unsupported. */
840 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
841 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
842 port_msg->stream, (int) port_msg->type);
843 goto fail;
844 }
845
846 if (port_msg->mmap) {
833 if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
847 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
848
849 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
850 if (rc == NXT_UNIT_AGAIN) {
851 recv_msg.fd = -1;
852 }
853
834 goto fail;
835 }
836 }
837
838 switch (port_msg->type) {
839
840 case _NXT_PORT_MSG_QUIT:
841 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);

--- 230 unchanged lines hidden (view full) ---

1072 (char *) nxt_unit_sptr_get(&r->method),
1073 (int) r->target_length,
1074 (char *) nxt_unit_sptr_get(&r->target),
1075 (int) r->content_length);
1076
1077 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1078
1079 res = nxt_unit_request_check_response_port(req, &port_id);
854 goto fail;
855 }
856 }
857
858 switch (port_msg->type) {
859
860 case _NXT_PORT_MSG_QUIT:
861 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);

--- 230 unchanged lines hidden (view full) ---

1092 (char *) nxt_unit_sptr_get(&r->method),
1093 (int) r->target_length,
1094 (char *) nxt_unit_sptr_get(&r->target),
1095 (int) r->content_length);
1096
1097 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1098
1099 res = nxt_unit_request_check_response_port(req, &port_id);
1100 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1101 return NXT_UNIT_ERROR;
1102 }
1080
1081 if (nxt_fast_path(res == NXT_UNIT_OK)) {
1082 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1083
1084 lib->callbacks.request_handler(req);
1085 }
1086
1087 return NXT_UNIT_OK;

--- 1283 unchanged lines hidden (view full) ---

2371 }
2372}
2373
2374
2375static nxt_unit_read_buf_t *
2376nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2377{
2378 nxt_unit_ctx_impl_t *ctx_impl;
1103
1104 if (nxt_fast_path(res == NXT_UNIT_OK)) {
1105 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1106
1107 lib->callbacks.request_handler(req);
1108 }
1109
1110 return NXT_UNIT_OK;

--- 1283 unchanged lines hidden (view full) ---

2394 }
2395}
2396
2397
2398static nxt_unit_read_buf_t *
2399nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2400{
2401 nxt_unit_ctx_impl_t *ctx_impl;
2402 nxt_unit_read_buf_t *rbuf;
2379
2380 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2381
2382 pthread_mutex_lock(&ctx_impl->mutex);
2383
2403
2404 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2405
2406 pthread_mutex_lock(&ctx_impl->mutex);
2407
2384 return nxt_unit_read_buf_get_impl(ctx_impl);
2408 rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2409
2410 pthread_mutex_unlock(&ctx_impl->mutex);
2411
2412 return rbuf;
2385}
2386
2387
2388static nxt_unit_read_buf_t *
2389nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2390{
2413}
2414
2415
2416static nxt_unit_read_buf_t *
2417nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2418{
2419 nxt_queue_link_t *link;
2391 nxt_unit_read_buf_t *rbuf;
2392
2420 nxt_unit_read_buf_t *rbuf;
2421
2393 if (ctx_impl->free_read_buf != NULL) {
2394 rbuf = ctx_impl->free_read_buf;
2395 ctx_impl->free_read_buf = rbuf->next;
2422 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2423 link = nxt_queue_first(&ctx_impl->free_rbuf);
2424 nxt_queue_remove(link);
2396
2425
2397 pthread_mutex_unlock(&ctx_impl->mutex);
2426 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2398
2399 return rbuf;
2400 }
2401
2427
2428 return rbuf;
2429 }
2430
2402 pthread_mutex_unlock(&ctx_impl->mutex);
2403
2404 rbuf = malloc(sizeof(nxt_unit_read_buf_t));
2405
2431 rbuf = malloc(sizeof(nxt_unit_read_buf_t));
2432
2433 if (nxt_fast_path(rbuf != NULL)) {
2434 rbuf->ctx_impl = ctx_impl;
2435 }
2436
2406 return rbuf;
2407}
2408
2409
2410static void
2411nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2412 nxt_unit_read_buf_t *rbuf)
2413{
2414 nxt_unit_ctx_impl_t *ctx_impl;
2415
2416 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2417
2418 pthread_mutex_lock(&ctx_impl->mutex);
2419
2437 return rbuf;
2438}
2439
2440
2441static void
2442nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2443 nxt_unit_read_buf_t *rbuf)
2444{
2445 nxt_unit_ctx_impl_t *ctx_impl;
2446
2447 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2448
2449 pthread_mutex_lock(&ctx_impl->mutex);
2450
2420 rbuf->next = ctx_impl->free_read_buf;
2421 ctx_impl->free_read_buf = rbuf;
2451 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2422
2423 pthread_mutex_unlock(&ctx_impl->mutex);
2424}
2425
2426
2427nxt_unit_buf_t *
2428nxt_unit_buf_next(nxt_unit_buf_t *buf)
2429{

--- 820 unchanged lines hidden (view full) ---

3250 if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
3251 nxt_unit_read_buf_release(ctx, rbuf);
3252
3253 break;
3254 }
3255
3256 pthread_mutex_lock(&ctx_impl->mutex);
3257
2452
2453 pthread_mutex_unlock(&ctx_impl->mutex);
2454}
2455
2456
2457nxt_unit_buf_t *
2458nxt_unit_buf_next(nxt_unit_buf_t *buf)
2459{

--- 820 unchanged lines hidden (view full) ---

3280 if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
3281 nxt_unit_read_buf_release(ctx, rbuf);
3282
3283 break;
3284 }
3285
3286 pthread_mutex_lock(&ctx_impl->mutex);
3287
3258 *ctx_impl->pending_read_tail = rbuf;
3259 ctx_impl->pending_read_tail = &rbuf->next;
3260 rbuf->next = NULL;
3288 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3261
3262 pthread_mutex_unlock(&ctx_impl->mutex);
3263
3264 if (port_msg->type == _NXT_PORT_MSG_QUIT) {
3265 nxt_unit_debug(ctx, "oosm: quit received");
3266
3267 return NXT_UNIT_ERROR;
3268 }
3269 }
3270
3271 return NXT_UNIT_OK;
3272}
3273
3274
3275static nxt_unit_mmap_t *
3276nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3277{
3289
3290 pthread_mutex_unlock(&ctx_impl->mutex);
3291
3292 if (port_msg->type == _NXT_PORT_MSG_QUIT) {
3293 nxt_unit_debug(ctx, "oosm: quit received");
3294
3295 return NXT_UNIT_ERROR;
3296 }
3297 }
3298
3299 return NXT_UNIT_OK;
3300}
3301
3302
3303static nxt_unit_mmap_t *
3304nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3305{
3278 uint32_t cap;
3306 uint32_t cap, n;
3307 nxt_unit_mmap_t *e;
3279
3308
3309 if (nxt_fast_path(mmaps->size > i)) {
3310 return mmaps->elts + i;
3311 }
3312
3280 cap = mmaps->cap;
3281
3282 if (cap == 0) {
3283 cap = i + 1;
3284 }
3285
3286 while (i + 1 > cap) {
3287
3288 if (cap < 16) {
3289 cap = cap * 2;
3290
3291 } else {
3292 cap = cap + cap / 2;
3293 }
3294 }
3295
3296 if (cap != mmaps->cap) {
3297
3313 cap = mmaps->cap;
3314
3315 if (cap == 0) {
3316 cap = i + 1;
3317 }
3318
3319 while (i + 1 > cap) {
3320
3321 if (cap < 16) {
3322 cap = cap * 2;
3323
3324 } else {
3325 cap = cap + cap / 2;
3326 }
3327 }
3328
3329 if (cap != mmaps->cap) {
3330
3298 mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
3299 if (nxt_slow_path(mmaps->elts == NULL)) {
3331 e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
3332 if (nxt_slow_path(e == NULL)) {
3300 return NULL;
3301 }
3302
3333 return NULL;
3334 }
3335
3303 memset(mmaps->elts + mmaps->cap, 0,
3304 sizeof(*mmaps->elts) * (cap - mmaps->cap));
3336 mmaps->elts = e;
3305
3337
3338 for (n = mmaps->cap; n < cap; n++) {
3339 e = mmaps->elts + n;
3340
3341 e->hdr = NULL;
3342 nxt_queue_init(&e->awaiting_rbuf);
3343 }
3344
3306 mmaps->cap = cap;
3307 }
3308
3309 if (i + 1 > mmaps->size) {
3310 mmaps->size = i + 1;
3311 }
3312
3313 return mmaps->elts + i;

--- 262 unchanged lines hidden (view full) ---

3576
3577 return NXT_UNIT_OK;
3578}
3579
3580
3581static int
3582nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
3583{
3345 mmaps->cap = cap;
3346 }
3347
3348 if (i + 1 > mmaps->size) {
3349 mmaps->size = i + 1;
3350 }
3351
3352 return mmaps->elts + i;

--- 262 unchanged lines hidden (view full) ---

3615
3616 return NXT_UNIT_OK;
3617}
3618
3619
3620static int
3621nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
3622{
3584 int rc;
3585 void *mem;
3586 struct stat mmap_stat;
3587 nxt_unit_mmap_t *mm;
3588 nxt_unit_impl_t *lib;
3589 nxt_unit_process_t *process;
3590 nxt_port_mmap_header_t *hdr;
3623 int rc;
3624 void *mem;
3625 nxt_queue_t awaiting_rbuf;
3626 struct stat mmap_stat;
3627 nxt_unit_mmap_t *mm;
3628 nxt_unit_impl_t *lib;
3629 nxt_unit_process_t *process;
3630 nxt_unit_ctx_impl_t *ctx_impl;
3631 nxt_unit_read_buf_t *rbuf;
3632 nxt_port_mmap_header_t *hdr;
3591
3592 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3593
3594 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
3595
3596 pthread_mutex_lock(&lib->mutex);
3597
3598 process = nxt_unit_process_find(lib, pid, 0);

--- 22 unchanged lines hidden (view full) ---

3621 nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
3622 strerror(errno), errno);
3623
3624 goto fail;
3625 }
3626
3627 hdr = mem;
3628
3633
3634 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3635
3636 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
3637
3638 pthread_mutex_lock(&lib->mutex);
3639
3640 process = nxt_unit_process_find(lib, pid, 0);

--- 22 unchanged lines hidden (view full) ---

3663 nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
3664 strerror(errno), errno);
3665
3666 goto fail;
3667 }
3668
3669 hdr = mem;
3670
3629 if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
3671 if (nxt_slow_path(hdr->src_pid != pid)) {
3630
3631 nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
3632 "detected: %d != %d or %d != %d", (int) hdr->src_pid,
3633 (int) pid, (int) hdr->dst_pid, (int) lib->pid);
3634
3635 munmap(mem, PORT_MMAP_SIZE);
3636
3637 goto fail;
3638 }
3639
3672
3673 nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
3674 "detected: %d != %d or %d != %d", (int) hdr->src_pid,
3675 (int) pid, (int) hdr->dst_pid, (int) lib->pid);
3676
3677 munmap(mem, PORT_MMAP_SIZE);
3678
3679 goto fail;
3680 }
3681
3682 nxt_queue_init(&awaiting_rbuf);
3683
3640 pthread_mutex_lock(&process->incoming.mutex);
3641
3642 mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
3643 if (nxt_slow_path(mm == NULL)) {
3644 nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
3645
3646 munmap(mem, PORT_MMAP_SIZE);
3647
3648 } else {
3649 mm->hdr = hdr;
3650
3651 hdr->sent_over = 0xFFFFu;
3652
3684 pthread_mutex_lock(&process->incoming.mutex);
3685
3686 mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
3687 if (nxt_slow_path(mm == NULL)) {
3688 nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
3689
3690 munmap(mem, PORT_MMAP_SIZE);
3691
3692 } else {
3693 mm->hdr = hdr;
3694
3695 hdr->sent_over = 0xFFFFu;
3696
3697 nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
3698 nxt_queue_init(&mm->awaiting_rbuf);
3699
3653 rc = NXT_UNIT_OK;
3654 }
3655
3656 pthread_mutex_unlock(&process->incoming.mutex);
3657
3700 rc = NXT_UNIT_OK;
3701 }
3702
3703 pthread_mutex_unlock(&process->incoming.mutex);
3704
3705 nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
3706
3707 ctx_impl = rbuf->ctx_impl;
3708
3709 pthread_mutex_lock(&ctx_impl->mutex);
3710
3711 nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
3712
3713 pthread_mutex_unlock(&ctx_impl->mutex);
3714
3715 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
3716
3717 } nxt_queue_loop;
3718
3658fail:
3659
3660 nxt_unit_process_release(process);
3661
3662 return rc;
3663}
3664
3665

--- 48 unchanged lines hidden (view full) ---

3714
3715 free(mmaps->elts);
3716 }
3717
3718 pthread_mutex_destroy(&mmaps->mutex);
3719}
3720
3721
3719fail:
3720
3721 nxt_unit_process_release(process);
3722
3723 return rc;
3724}
3725
3726

--- 48 unchanged lines hidden (view full) ---

3775
3776 free(mmaps->elts);
3777 }
3778
3779 pthread_mutex_destroy(&mmaps->mutex);
3780}
3781
3782
3722static nxt_port_mmap_header_t *
3723nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
3724 uint32_t id)
3725{
3726 nxt_port_mmap_header_t *hdr;
3727
3728 if (nxt_fast_path(process->incoming.size > id)) {
3729 hdr = process->incoming.elts[id].hdr;
3730
3731 } else {
3732 hdr = NULL;
3733 }
3734
3735 return hdr;
3736}
3737
3738
3739static int
3783static int
3740nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
3784nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
3785 nxt_unit_read_buf_t *rbuf)
3741{
3786{
3742 int rc;
3787 int res;
3743 nxt_chunk_id_t c;
3744 nxt_unit_process_t *process;
3745 nxt_port_mmap_header_t *hdr;
3746 nxt_port_mmap_tracking_msg_t *tracking_msg;
3747
3748 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
3749 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
3750 recv_msg->stream, (int) recv_msg->size);
3751
3788 nxt_chunk_id_t c;
3789 nxt_unit_process_t *process;
3790 nxt_port_mmap_header_t *hdr;
3791 nxt_port_mmap_tracking_msg_t *tracking_msg;
3792
3793 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
3794 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
3795 recv_msg->stream, (int) recv_msg->size);
3796
3752 return 0;
3797 return NXT_UNIT_ERROR;
3753 }
3754
3755 tracking_msg = recv_msg->start;
3756
3757 recv_msg->start = tracking_msg + 1;
3758 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
3759
3760 process = nxt_unit_msg_get_process(ctx, recv_msg);
3761 if (nxt_slow_path(process == NULL)) {
3798 }
3799
3800 tracking_msg = recv_msg->start;
3801
3802 recv_msg->start = tracking_msg + 1;
3803 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
3804
3805 process = nxt_unit_msg_get_process(ctx, recv_msg);
3806 if (nxt_slow_path(process == NULL)) {
3762 return 0;
3807 return NXT_UNIT_ERROR;
3763 }
3764
3765 pthread_mutex_lock(&process->incoming.mutex);
3766
3808 }
3809
3810 pthread_mutex_lock(&process->incoming.mutex);
3811
3767 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
3768 if (nxt_slow_path(hdr == NULL)) {
3769 pthread_mutex_unlock(&process->incoming.mutex);
3812 res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming,
3813 recv_msg->pid, tracking_msg->mmap_id,
3814 &hdr, rbuf);
3770
3815
3771 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
3772 "invalid mmap id %d,%"PRIu32,
3773 recv_msg->stream, (int) process->pid,
3774 tracking_msg->mmap_id);
3775
3776 return 0;
3816 if (nxt_slow_path(res != NXT_UNIT_OK)) {
3817 return res;
3777 }
3778
3779 c = tracking_msg->tracking_id;
3818 }
3819
3820 c = tracking_msg->tracking_id;
3780 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
3821 res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
3781
3822
3782 if (rc == 0) {
3823 if (res == 0) {
3783 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
3784 recv_msg->stream);
3785
3786 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
3824 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
3825 recv_msg->stream);
3826
3827 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
3828
3829 res = NXT_UNIT_CANCELLED;
3830
3831 } else {
3832 res = NXT_UNIT_OK;
3787 }
3788
3789 pthread_mutex_unlock(&process->incoming.mutex);
3790
3833 }
3834
3835 pthread_mutex_unlock(&process->incoming.mutex);
3836
3791 return rc;
3837 return res;
3792}
3793
3794
3795static int
3838}
3839
3840
3841static int
3796nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
3842nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
3843 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
3844 nxt_unit_read_buf_t *rbuf)
3797{
3845{
3846 int res, need_rbuf;
3847 nxt_unit_mmap_t *mm;
3848 nxt_unit_ctx_impl_t *ctx_impl;
3849
3850 mm = nxt_unit_mmap_at(mmaps, id);
3851 if (nxt_slow_path(mm == NULL)) {
3852 nxt_unit_alert(ctx, "failed to allocate mmap");
3853
3854 pthread_mutex_unlock(&mmaps->mutex);
3855
3856 *hdr = NULL;
3857
3858 return NXT_UNIT_ERROR;
3859 }
3860
3861 *hdr = mm->hdr;
3862
3863 if (nxt_fast_path(*hdr != NULL)) {
3864 return NXT_UNIT_OK;
3865 }
3866
3867 need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
3868
3869 nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
3870
3871 pthread_mutex_unlock(&mmaps->mutex);
3872
3873 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3874
3875 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
3876
3877 if (need_rbuf) {
3878 res = nxt_unit_get_mmap(ctx, pid, id);
3879 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
3880 return NXT_UNIT_ERROR;
3881 }
3882 }
3883
3884 return NXT_UNIT_AGAIN;
3885}
3886
3887
3888static int
3889nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
3890 nxt_unit_read_buf_t *rbuf)
3891{
3892 int res;
3798 void *start;
3799 uint32_t size;
3893 void *start;
3894 uint32_t size;
3895 nxt_unit_mmaps_t *mmaps;
3800 nxt_unit_process_t *process;
3801 nxt_unit_mmap_buf_t *b, **incoming_tail;
3802 nxt_port_mmap_msg_t *mmap_msg, *end;
3803 nxt_port_mmap_header_t *hdr;
3804
3805 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
3806 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
3807 recv_msg->stream, (int) recv_msg->size);

--- 6 unchanged lines hidden (view full) ---

3814 return NXT_UNIT_ERROR;
3815 }
3816
3817 mmap_msg = recv_msg->start;
3818 end = nxt_pointer_to(recv_msg->start, recv_msg->size);
3819
3820 incoming_tail = &recv_msg->incoming_buf;
3821
3896 nxt_unit_process_t *process;
3897 nxt_unit_mmap_buf_t *b, **incoming_tail;
3898 nxt_port_mmap_msg_t *mmap_msg, *end;
3899 nxt_port_mmap_header_t *hdr;
3900
3901 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
3902 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
3903 recv_msg->stream, (int) recv_msg->size);

--- 6 unchanged lines hidden (view full) ---

3910 return NXT_UNIT_ERROR;
3911 }
3912
3913 mmap_msg = recv_msg->start;
3914 end = nxt_pointer_to(recv_msg->start, recv_msg->size);
3915
3916 incoming_tail = &recv_msg->incoming_buf;
3917
3918 /* Allocating buffer structures. */
3822 for (; mmap_msg < end; mmap_msg++) {
3823 b = nxt_unit_mmap_buf_get(ctx);
3824 if (nxt_slow_path(b == NULL)) {
3825 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
3826 recv_msg->stream);
3827
3919 for (; mmap_msg < end; mmap_msg++) {
3920 b = nxt_unit_mmap_buf_get(ctx);
3921 if (nxt_slow_path(b == NULL)) {
3922 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
3923 recv_msg->stream);
3924
3925 while (recv_msg->incoming_buf != NULL) {
3926 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
3927 }
3928
3828 return NXT_UNIT_ERROR;
3829 }
3830
3831 nxt_unit_mmap_buf_insert(incoming_tail, b);
3832 incoming_tail = &b->next;
3833 }
3834
3835 b = recv_msg->incoming_buf;
3836 mmap_msg = recv_msg->start;
3837
3929 return NXT_UNIT_ERROR;
3930 }
3931
3932 nxt_unit_mmap_buf_insert(incoming_tail, b);
3933 incoming_tail = &b->next;
3934 }
3935
3936 b = recv_msg->incoming_buf;
3937 mmap_msg = recv_msg->start;
3938
3838 pthread_mutex_lock(&process->incoming.mutex);
3939 mmaps = &process->incoming;
3839
3940
3941 pthread_mutex_lock(&mmaps->mutex);
3942
3840 for (; mmap_msg < end; mmap_msg++) {
3943 for (; mmap_msg < end; mmap_msg++) {
3841 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
3842 if (nxt_slow_path(hdr == NULL)) {
3843 pthread_mutex_unlock(&process->incoming.mutex);
3944 res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
3945 recv_msg->pid, mmap_msg->mmap_id,
3946 &hdr, rbuf);
3844
3947
3845 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
3846 "invalid mmap id %d,%"PRIu32,
3847 recv_msg->stream, (int) process->pid,
3848 mmap_msg->mmap_id);
3948 if (nxt_slow_path(res != NXT_UNIT_OK)) {
3949 while (recv_msg->incoming_buf != NULL) {
3950 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
3951 }
3849
3952
3850 return NXT_UNIT_ERROR;
3953 return res;
3851 }
3852
3853 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
3854 size = mmap_msg->size;
3855
3856 if (recv_msg->start == mmap_msg) {
3857 recv_msg->start = start;
3858 recv_msg->size = size;

--- 10 unchanged lines hidden (view full) ---

3869 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
3870 recv_msg->stream,
3871 start, (int) size,
3872 (int) hdr->src_pid, (int) hdr->dst_pid,
3873 (int) hdr->id, (int) mmap_msg->chunk_id,
3874 (int) mmap_msg->size);
3875 }
3876
3954 }
3955
3956 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
3957 size = mmap_msg->size;
3958
3959 if (recv_msg->start == mmap_msg) {
3960 recv_msg->start = start;
3961 recv_msg->size = size;

--- 10 unchanged lines hidden (view full) ---

3972 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
3973 recv_msg->stream,
3974 start, (int) size,
3975 (int) hdr->src_pid, (int) hdr->dst_pid,
3976 (int) hdr->id, (int) mmap_msg->chunk_id,
3977 (int) mmap_msg->size);
3978 }
3979
3877 pthread_mutex_unlock(&process->incoming.mutex);
3980 pthread_mutex_unlock(&mmaps->mutex);
3878
3879 return NXT_UNIT_OK;
3880}
3881
3882
3981
3982 return NXT_UNIT_OK;
3983}
3984
3985
3986static int
3987nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
3988{
3989 ssize_t res;
3990 nxt_unit_impl_t *lib;
3991 nxt_unit_ctx_impl_t *ctx_impl;
3992
3993 struct {
3994 nxt_port_msg_t msg;
3995 nxt_port_msg_get_mmap_t get_mmap;
3996 } m;
3997
3998 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3999 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4000
4001 memset(&m.msg, 0, sizeof(nxt_port_msg_t));
4002
4003 m.msg.pid = lib->pid;
4004 m.msg.reply_port = ctx_impl->read_port->id.id;
4005 m.msg.type = _NXT_PORT_MSG_GET_MMAP;
4006
4007 m.get_mmap.id = id;
4008
4009 nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
4010
4011 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
4012 if (nxt_slow_path(res != sizeof(m))) {
4013 return NXT_UNIT_ERROR;
4014 }
4015
4016 return NXT_UNIT_OK;
4017}
4018
4019
3883static void
3884nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
3885 nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
3886 void *start, uint32_t size)
3887{
3888 int freed_chunks;
3889 u_char *p, *end;
3890 nxt_chunk_id_t c;

--- 214 unchanged lines hidden (view full) ---

4105 return rc;
4106}
4107
4108
4109int
4110nxt_unit_run_once(nxt_unit_ctx_t *ctx)
4111{
4112 int rc;
4020static void
4021nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
4022 nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
4023 void *start, uint32_t size)
4024{
4025 int freed_chunks;
4026 u_char *p, *end;
4027 nxt_chunk_id_t c;

--- 214 unchanged lines hidden (view full) ---

4242 return rc;
4243}
4244
4245
4246int
4247nxt_unit_run_once(nxt_unit_ctx_t *ctx)
4248{
4249 int rc;
4250 nxt_queue_link_t *link;
4113 nxt_unit_ctx_impl_t *ctx_impl;
4114 nxt_unit_read_buf_t *rbuf;
4115
4116 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4117
4118 nxt_unit_ctx_use(ctx_impl);
4119
4120 pthread_mutex_lock(&ctx_impl->mutex);
4121
4251 nxt_unit_ctx_impl_t *ctx_impl;
4252 nxt_unit_read_buf_t *rbuf;
4253
4254 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4255
4256 nxt_unit_ctx_use(ctx_impl);
4257
4258 pthread_mutex_lock(&ctx_impl->mutex);
4259
4122 if (ctx_impl->pending_read_head != NULL) {
4123 rbuf = ctx_impl->pending_read_head;
4124 ctx_impl->pending_read_head = rbuf->next;
4260 if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
4125
4261
4126 if (ctx_impl->pending_read_tail == &rbuf->next) {
4127 ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
4128 }
4262next_pending:
4129
4263
4264 link = nxt_queue_first(&ctx_impl->pending_rbuf);
4265 nxt_queue_remove(link);
4266
4267 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
4268
4130 pthread_mutex_unlock(&ctx_impl->mutex);
4131
4132 } else {
4133 rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
4269 pthread_mutex_unlock(&ctx_impl->mutex);
4270
4271 } else {
4272 rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
4273
4274 pthread_mutex_unlock(&ctx_impl->mutex);
4275
4134 if (nxt_slow_path(rbuf == NULL)) {
4135
4136 nxt_unit_ctx_release(ctx_impl);
4137
4138 return NXT_UNIT_ERROR;
4139 }
4140
4141 nxt_unit_read_buf(ctx, rbuf);
4142 }
4143
4144 if (nxt_fast_path(rbuf->size > 0)) {
4276 if (nxt_slow_path(rbuf == NULL)) {
4277
4278 nxt_unit_ctx_release(ctx_impl);
4279
4280 return NXT_UNIT_ERROR;
4281 }
4282
4283 nxt_unit_read_buf(ctx, rbuf);
4284 }
4285
4286 if (nxt_fast_path(rbuf->size > 0)) {
4145 rc = nxt_unit_process_msg(ctx,
4146 rbuf->buf, rbuf->size,
4147 rbuf->oob, sizeof(rbuf->oob));
4287 rc = nxt_unit_process_msg(ctx, rbuf);
4148
4149#if (NXT_DEBUG)
4288
4289#if (NXT_DEBUG)
4150 memset(rbuf->buf, 0xAC, rbuf->size);
4290 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
4291 memset(rbuf->buf, 0xAC, rbuf->size);
4292 }
4151#endif
4152
4153 } else {
4154 rc = NXT_UNIT_ERROR;
4155 }
4156
4293#endif
4294
4295 } else {
4296 rc = NXT_UNIT_ERROR;
4297 }
4298
4157 nxt_unit_read_buf_release(ctx, rbuf);
4299 if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) {
4300 rc = NXT_UNIT_OK;
4158
4301
4159 nxt_unit_process_ready_req(ctx_impl);
4302 } else {
4303 nxt_unit_read_buf_release(ctx, rbuf);
4304 }
4160
4305
4306 if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) {
4307 rc = NXT_UNIT_OK;
4308 }
4309
4310 if (nxt_fast_path(rc == NXT_UNIT_OK)) {
4311 pthread_mutex_lock(&ctx_impl->mutex);
4312
4313 if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
4314 goto next_pending;
4315 }
4316
4317 pthread_mutex_unlock(&ctx_impl->mutex);
4318
4319 nxt_unit_process_ready_req(ctx_impl);
4320 }
4321
4161 nxt_unit_ctx_release(ctx_impl);
4162
4163 return rc;
4164}
4165
4166
4167static void
4168nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)

--- 1133 unchanged lines hidden ---
4322 nxt_unit_ctx_release(ctx_impl);
4323
4324 return rc;
4325}
4326
4327
4328static void
4329nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)

--- 1133 unchanged lines hidden ---