Lines Matching refs:ctx

47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
79 nxt_unit_ctx_t *ctx);
83 nxt_unit_ctx_t *ctx);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
197 nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
288 nxt_unit_ctx_t ctx; member
432 nxt_unit_ctx_t *ctx; in nxt_unit_init() local
489 ctx = &lib->main_ctx.ctx; in nxt_unit_init()
496 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); in nxt_unit_init()
503 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); in nxt_unit_init()
511 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, in nxt_unit_init()
524 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); in nxt_unit_init()
542 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd, in nxt_unit_init()
550 lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem); in nxt_unit_init()
557 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); in nxt_unit_init()
567 return ctx; in nxt_unit_init()
579 nxt_unit_ctx_release(&lib->main_ctx.ctx); in nxt_unit_init()
675 ctx_impl->ctx.data = data; in nxt_unit_ctx_init()
676 ctx_impl->ctx.unit = &lib->unit; in nxt_unit_ctx_init()
715 ctx_impl->req.req.ctx = &ctx_impl->ctx; in nxt_unit_ctx_init()
726 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) in nxt_unit_ctx_use() argument
730 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_ctx_use()
737 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) in nxt_unit_ctx_release() argument
742 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_ctx_release()
942 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) in nxt_unit_ready() argument
950 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_ready()
963 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob); in nxt_unit_ready()
973 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, in nxt_unit_process_msg() argument
983 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_msg()
991 nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg"); in nxt_unit_process_msg()
998 nxt_unit_debug(ctx, "read port closed"); in nxt_unit_process_msg()
1000 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); in nxt_unit_process_msg()
1005 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); in nxt_unit_process_msg()
1013 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", in nxt_unit_process_msg()
1027 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)", in nxt_unit_process_msg()
1035 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)", in nxt_unit_process_msg()
1042 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); in nxt_unit_process_msg()
1068 nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream, in nxt_unit_process_msg()
1071 nxt_unit_quit(ctx, quit_param); in nxt_unit_process_msg()
1077 rc = nxt_unit_process_new_port(ctx, &recv_msg); in nxt_unit_process_msg()
1081 rc = nxt_unit_ctx_ready(ctx); in nxt_unit_process_msg()
1085 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", in nxt_unit_process_msg()
1089 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", in nxt_unit_process_msg()
1102 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", in nxt_unit_process_msg()
1109 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); in nxt_unit_process_msg()
1113 rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq); in nxt_unit_process_msg()
1117 rc = nxt_unit_process_req_body(ctx, &recv_msg); in nxt_unit_process_msg()
1121 rc = nxt_unit_process_websocket(ctx, &recv_msg); in nxt_unit_process_msg()
1126 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " in nxt_unit_process_msg()
1136 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", in nxt_unit_process_msg()
1145 rc = nxt_unit_process_shm_ack(ctx); in nxt_unit_process_msg()
1149 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d", in nxt_unit_process_msg()
1174 nxt_unit_read_buf_release(ctx, rbuf); in nxt_unit_process_msg()
1182 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) in nxt_unit_process_new_port() argument
1189 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " in nxt_unit_process_new_port()
1197 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", in nxt_unit_process_new_port()
1205 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d", in nxt_unit_process_new_port()
1222 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], in nxt_unit_process_new_port()
1232 port = nxt_unit_add_port(ctx, &new_port, mem); in nxt_unit_process_new_port()
1244 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) in nxt_unit_ctx_ready() argument
1249 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_ctx_ready()
1257 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_ctx_ready()
1261 return lib->callbacks.ready_handler(ctx); in nxt_unit_ctx_ready()
1269 nxt_unit_quit(ctx, lib->main_ctx.quit_param); in nxt_unit_ctx_ready()
1275 lib->callbacks.add_port(ctx, lib->shared_port); in nxt_unit_ctx_ready()
1284 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, in nxt_unit_process_req_headers() argument
1296 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", in nxt_unit_process_req_headers()
1303 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " in nxt_unit_process_req_headers()
1310 req_impl = nxt_unit_request_info_get(ctx); in nxt_unit_process_req_headers()
1312 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", in nxt_unit_process_req_headers()
1356 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, in nxt_unit_process_req_headers()
1378 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_req_headers()
1383 res = nxt_unit_request_hash_add(ctx, req); in nxt_unit_process_req_headers()
1414 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) in nxt_unit_process_req_body() argument
1421 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); in nxt_unit_process_req_body()
1450 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_req_body()
1471 nxt_unit_ctx_t *ctx; in nxt_unit_request_check_response_port() local
1479 ctx = req->ctx; in nxt_unit_request_check_response_port()
1480 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_request_check_response_port()
1481 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_request_check_response_port()
1494 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", in nxt_unit_request_check_response_port()
1500 nxt_unit_debug(ctx, "check_response_port: " in nxt_unit_request_check_response_port()
1516 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); in nxt_unit_request_check_response_port()
1518 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", in nxt_unit_request_check_response_port()
1535 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", in nxt_unit_request_check_response_port()
1540 nxt_unit_free(ctx, port); in nxt_unit_request_check_response_port()
1547 nxt_unit_alert(ctx, "check_response_port: process %d not found", in nxt_unit_request_check_response_port()
1554 nxt_unit_free(ctx, port); in nxt_unit_request_check_response_port()
1579 res = nxt_unit_get_port(ctx, port_id); in nxt_unit_request_check_response_port()
1599 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_req_headers_ack()
1600 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_send_req_headers_ack()
1610 res = nxt_unit_port_send(req->ctx, req->response_port, in nxt_unit_send_req_headers_ack()
1621 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) in nxt_unit_process_websocket() argument
1631 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); in nxt_unit_process_websocket()
1638 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_websocket()
1642 ws_impl = nxt_unit_websocket_frame_get(ctx); in nxt_unit_process_websocket()
1644 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", in nxt_unit_process_websocket()
1667 b = nxt_unit_mmap_buf_get(ctx); in nxt_unit_process_websocket()
1669 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", in nxt_unit_process_websocket()
1727 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) in nxt_unit_process_shm_ack() argument
1732 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_shm_ack()
1736 cb->shm_ack_handler(ctx); in nxt_unit_process_shm_ack()
1744 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) in nxt_unit_request_info_get() argument
1751 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_request_info_get()
1753 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_request_info_get()
1760 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t) in nxt_unit_request_info_get()
1766 req_impl->req.unit = ctx->unit; in nxt_unit_request_info_get()
1767 req_impl->req.ctx = ctx; in nxt_unit_request_info_get()
1791 nxt_unit_ctx_t *ctx; in nxt_unit_request_info_release() local
1795 ctx = req->ctx; in nxt_unit_request_info_release()
1796 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_request_info_release()
1803 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); in nxt_unit_request_info_release()
1836 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) { in nxt_unit_request_info_release()
1837 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); in nxt_unit_request_info_release()
1847 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_request_info_free()
1852 nxt_unit_free(&ctx_impl->ctx, req_impl); in nxt_unit_request_info_free()
1858 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) in nxt_unit_websocket_frame_get() argument
1864 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_websocket_frame_get()
1871 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t)); in nxt_unit_websocket_frame_get()
1913 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, in nxt_unit_websocket_frame_free() argument
1918 nxt_unit_free(ctx, ws_impl); in nxt_unit_websocket_frame_free()
2420 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); in nxt_unit_response_buf_alloc()
2431 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, in nxt_unit_response_buf_alloc()
2447 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) in nxt_unit_mmap_buf_get() argument
2452 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_mmap_buf_get()
2459 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t)); in nxt_unit_mmap_buf_get()
2527 rc = nxt_unit_request_hash_add(req->ctx, req); in nxt_unit_response_upgrade()
2646 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_buf_send()
2670 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", in nxt_unit_mmap_buf_send()
2676 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), in nxt_unit_mmap_buf_send()
2706 nxt_unit_debug(req->ctx, "allocated_chunks %d", in nxt_unit_mmap_buf_send()
2713 nxt_unit_alert(req->ctx, in nxt_unit_mmap_buf_send()
2723 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", in nxt_unit_mmap_buf_send()
2727 res = nxt_unit_port_send(req->ctx, req->response_port, in nxt_unit_mmap_buf_send()
2766 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, in nxt_unit_free_outgoing_buf()
2776 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr); in nxt_unit_free_outgoing_buf()
2784 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) in nxt_unit_read_buf_get() argument
2789 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_read_buf_get()
2818 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t)); in nxt_unit_read_buf_get_impl()
2829 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, in nxt_unit_read_buf_release() argument
2834 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_read_buf_release()
2937 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, in nxt_unit_response_write_nb()
3036 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, in nxt_unit_response_write_cb()
3168 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); in nxt_unit_request_preread()
3174 mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size); in nxt_unit_request_preread()
3313 (void) nxt_unit_port_send(req->ctx, req->response_port, in nxt_unit_request_done()
3352 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, in nxt_unit_websocket_sendv()
3395 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, in nxt_unit_websocket_sendv()
3456 b = nxt_unit_malloc(ws->req->ctx, size); in nxt_unit_websocket_retain()
3492 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, in nxt_unit_mmap_get() argument
3501 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_get()
3576 res = nxt_unit_send_oosm(ctx, port); in nxt_unit_mmap_get()
3587 nxt_unit_debug(ctx, "oosm: waiting for ACK"); in nxt_unit_mmap_get()
3589 res = nxt_unit_wait_shm_ack(ctx); in nxt_unit_mmap_get()
3594 nxt_unit_debug(ctx, "oosm: retry"); in nxt_unit_mmap_get()
3602 hdr = nxt_unit_new_mmap(ctx, port, *n); in nxt_unit_mmap_get()
3608 nxt_unit_debug(ctx, "allocated_chunks %d", in nxt_unit_mmap_get()
3618 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) in nxt_unit_send_oosm() argument
3624 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_oosm()
3635 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); in nxt_unit_send_oosm()
3645 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) in nxt_unit_wait_shm_ack() argument
3651 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_wait_shm_ack()
3654 rbuf = nxt_unit_read_buf_get(ctx); in nxt_unit_wait_shm_ack()
3660 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); in nxt_unit_wait_shm_ack()
3664 nxt_unit_read_buf_release(ctx, rbuf); in nxt_unit_wait_shm_ack()
3670 nxt_unit_read_buf_release(ctx, rbuf); in nxt_unit_wait_shm_ack()
3681 nxt_unit_debug(ctx, "oosm: quit received"); in nxt_unit_wait_shm_ack()
3745 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) in nxt_unit_new_mmap() argument
3753 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_new_mmap()
3757 nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); in nxt_unit_new_mmap()
3762 fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE); in nxt_unit_new_mmap()
3769 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, in nxt_unit_new_mmap()
3800 rc = nxt_unit_send_mmap(ctx, port, fd); in nxt_unit_new_mmap()
3806 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", in nxt_unit_new_mmap()
3827 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) in nxt_unit_shm_open() argument
3835 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_shm_open()
3844 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, in nxt_unit_shm_open()
3850 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); in nxt_unit_shm_open()
3856 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", in nxt_unit_shm_open()
3869 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, in nxt_unit_shm_open()
3876 nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name, in nxt_unit_shm_open()
3887 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, in nxt_unit_shm_open()
3900 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) in nxt_unit_send_mmap() argument
3908 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_mmap()
3921 res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), &oob); in nxt_unit_send_mmap()
3931 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, in nxt_unit_get_outgoing_buf() argument
3945 mmap_buf->free_ptr = nxt_unit_malloc(ctx, in nxt_unit_get_outgoing_buf()
3959 nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", in nxt_unit_get_outgoing_buf()
3968 hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks); in nxt_unit_get_outgoing_buf()
3988 mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_get_outgoing_buf()
3990 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", in nxt_unit_get_outgoing_buf()
3999 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) in nxt_unit_incoming_mmap() argument
4011 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_incoming_mmap()
4013 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); in nxt_unit_incoming_mmap()
4016 nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, in nxt_unit_incoming_mmap()
4025 nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)", in nxt_unit_incoming_mmap()
4035 nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header " in nxt_unit_incoming_mmap()
4050 nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array"); in nxt_unit_incoming_mmap()
4081 nxt_unit_awake_ctx(ctx, ctx_impl); in nxt_unit_incoming_mmap()
4090 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl) in nxt_unit_awake_ctx() argument
4094 if (nxt_fast_path(ctx == &ctx_impl->ctx)) { in nxt_unit_awake_ctx()
4101 nxt_unit_alert(ctx, "target context read_port is NULL or not writable"); in nxt_unit_awake_ctx()
4110 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, in nxt_unit_awake_ctx()
4169 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, in nxt_unit_check_rbuf_mmap() argument
4179 nxt_unit_alert(ctx, "failed to allocate mmap"); in nxt_unit_check_rbuf_mmap()
4200 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_check_rbuf_mmap()
4205 res = nxt_unit_get_mmap(ctx, pid, id); in nxt_unit_check_rbuf_mmap()
4216 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, in nxt_unit_mmap_read() argument
4229 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", in nxt_unit_mmap_read()
4242 b = nxt_unit_mmap_buf_get(ctx); in nxt_unit_mmap_read()
4244 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", in nxt_unit_mmap_read()
4261 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_read()
4268 res = nxt_unit_check_rbuf_mmap(ctx, mmaps, in nxt_unit_mmap_read()
4295 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", in nxt_unit_mmap_read()
4310 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) in nxt_unit_get_mmap() argument
4321 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_get_mmap()
4322 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_get_mmap()
4332 nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); in nxt_unit_get_mmap()
4334 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL); in nxt_unit_get_mmap()
4344 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, in nxt_unit_mmap_release() argument
4367 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_release()
4372 nxt_unit_debug(ctx, "allocated_chunks %d", in nxt_unit_mmap_release()
4380 nxt_unit_send_shm_ack(ctx, hdr->src_pid); in nxt_unit_mmap_release()
4386 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) in nxt_unit_send_shm_ack() argument
4392 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_shm_ack()
4403 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); in nxt_unit_send_shm_ack()
4448 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) in nxt_unit_process_get() argument
4454 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_get()
4465 process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t)); in nxt_unit_process_get()
4467 nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid); in nxt_unit_process_get()
4488 nxt_unit_alert(ctx, "process %d insert failed", (int) pid); in nxt_unit_process_get()
4490 nxt_unit_free(ctx, process); in nxt_unit_process_get()
4534 nxt_unit_run(nxt_unit_ctx_t *ctx) in nxt_unit_run() argument
4539 nxt_unit_ctx_use(ctx); in nxt_unit_run()
4541 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_run()
4546 rc = nxt_unit_run_once_impl(ctx); in nxt_unit_run()
4549 nxt_unit_quit(ctx, NXT_QUIT_NORMAL); in nxt_unit_run()
4554 nxt_unit_ctx_release(ctx); in nxt_unit_run()
4561 nxt_unit_run_once(nxt_unit_ctx_t *ctx) in nxt_unit_run_once() argument
4565 nxt_unit_ctx_use(ctx); in nxt_unit_run_once()
4567 rc = nxt_unit_run_once_impl(ctx); in nxt_unit_run_once()
4569 nxt_unit_ctx_release(ctx); in nxt_unit_run_once()
4576 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) in nxt_unit_run_once_impl() argument
4581 rbuf = nxt_unit_read_buf_get(ctx); in nxt_unit_run_once_impl()
4586 rc = nxt_unit_read_buf(ctx, rbuf); in nxt_unit_run_once_impl()
4588 nxt_unit_read_buf_release(ctx, rbuf); in nxt_unit_run_once_impl()
4593 rc = nxt_unit_process_msg(ctx, rbuf, NULL); in nxt_unit_run_once_impl()
4598 rc = nxt_unit_process_pending_rbuf(ctx); in nxt_unit_run_once_impl()
4603 nxt_unit_process_ready_req(ctx); in nxt_unit_run_once_impl()
4610 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) in nxt_unit_read_buf() argument
4619 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_read_buf()
4621 if (ctx_impl->wait_items > 0 || !nxt_unit_chk_ready(ctx)) { in nxt_unit_read_buf()
4622 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); in nxt_unit_read_buf()
4628 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_read_buf()
4638 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", in nxt_unit_read_buf()
4644 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", in nxt_unit_read_buf()
4654 if (nxt_fast_path(nxt_unit_chk_ready(ctx))) { in nxt_unit_read_buf()
4655 res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); in nxt_unit_read_buf()
4683 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)", in nxt_unit_read_buf()
4691 nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04X, %04X]", in nxt_unit_read_buf()
4696 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); in nxt_unit_read_buf()
4705 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); in nxt_unit_read_buf()
4713 nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]", in nxt_unit_read_buf()
4722 nxt_unit_chk_ready(nxt_unit_ctx_t *ctx) in nxt_unit_chk_ready() argument
4727 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_chk_ready()
4728 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_chk_ready()
4737 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) in nxt_unit_process_pending_rbuf() argument
4744 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_process_pending_rbuf()
4766 rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL); in nxt_unit_process_pending_rbuf()
4769 nxt_unit_read_buf_release(ctx, rbuf); in nxt_unit_process_pending_rbuf()
4775 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); in nxt_unit_process_pending_rbuf()
4783 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) in nxt_unit_process_ready_req() argument
4792 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_process_ready_req()
4812 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); in nxt_unit_process_ready_req()
4826 res = nxt_unit_request_hash_add(ctx, req); in nxt_unit_process_ready_req()
4851 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) in nxt_unit_run_ctx() argument
4857 nxt_unit_ctx_use(ctx); in nxt_unit_run_ctx()
4859 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_run_ctx()
4864 rbuf = nxt_unit_read_buf_get(ctx); in nxt_unit_run_ctx()
4872 rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); in nxt_unit_run_ctx()
4877 rc = nxt_unit_process_msg(ctx, rbuf, NULL); in nxt_unit_run_ctx()
4882 rc = nxt_unit_process_pending_rbuf(ctx); in nxt_unit_run_ctx()
4887 nxt_unit_process_ready_req(ctx); in nxt_unit_run_ctx()
4890 nxt_unit_ctx_release(ctx); in nxt_unit_run_ctx()
4953 nxt_unit_run_shared(nxt_unit_ctx_t *ctx) in nxt_unit_run_shared() argument
4959 nxt_unit_ctx_use(ctx); in nxt_unit_run_shared()
4961 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_run_shared()
4965 while (nxt_fast_path(nxt_unit_chk_ready(ctx))) { in nxt_unit_run_shared()
4966 rbuf = nxt_unit_read_buf_get(ctx); in nxt_unit_run_shared()
4974 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); in nxt_unit_run_shared()
4980 nxt_unit_read_buf_release(ctx, rbuf); in nxt_unit_run_shared()
4984 rc = nxt_unit_process_msg(ctx, rbuf, NULL); in nxt_unit_run_shared()
4990 nxt_unit_ctx_release(ctx); in nxt_unit_run_shared()
4997 nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) in nxt_unit_dequeue_request() argument
5004 nxt_unit_ctx_use(ctx); in nxt_unit_dequeue_request()
5006 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_dequeue_request()
5010 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) { in nxt_unit_dequeue_request()
5014 rbuf = nxt_unit_read_buf_get(ctx); in nxt_unit_dequeue_request()
5019 rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); in nxt_unit_dequeue_request()
5021 nxt_unit_read_buf_release(ctx, rbuf); in nxt_unit_dequeue_request()
5025 (void) nxt_unit_process_msg(ctx, rbuf, &req); in nxt_unit_dequeue_request()
5029 nxt_unit_ctx_release(ctx); in nxt_unit_dequeue_request()
5036 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) in nxt_unit_process_port_msg() argument
5040 nxt_unit_ctx_use(ctx); in nxt_unit_process_port_msg()
5042 rc = nxt_unit_process_port_msg_impl(ctx, port); in nxt_unit_process_port_msg()
5044 nxt_unit_ctx_release(ctx); in nxt_unit_process_port_msg()
5051 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) in nxt_unit_process_port_msg_impl() argument
5057 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_port_msg_impl()
5059 if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) { in nxt_unit_process_port_msg_impl()
5063 rbuf = nxt_unit_read_buf_get(ctx); in nxt_unit_process_port_msg_impl()
5069 rc = nxt_unit_shared_port_recv(ctx, port, rbuf); in nxt_unit_process_port_msg_impl()
5072 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf); in nxt_unit_process_port_msg_impl()
5076 nxt_unit_read_buf_release(ctx, rbuf); in nxt_unit_process_port_msg_impl()
5080 rc = nxt_unit_process_msg(ctx, rbuf, NULL); in nxt_unit_process_port_msg_impl()
5085 rc = nxt_unit_process_pending_rbuf(ctx); in nxt_unit_process_port_msg_impl()
5090 nxt_unit_process_ready_req(ctx); in nxt_unit_process_port_msg_impl()
5097 nxt_unit_done(nxt_unit_ctx_t *ctx) in nxt_unit_done() argument
5099 nxt_unit_ctx_release(ctx); in nxt_unit_done()
5104 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) in nxt_unit_ctx_alloc() argument
5113 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_ctx_alloc()
5115 new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t) in nxt_unit_ctx_alloc()
5118 nxt_unit_alert(ctx, "failed to allocate context"); in nxt_unit_ctx_alloc()
5125 nxt_unit_free(ctx, new_ctx); in nxt_unit_ctx_alloc()
5132 port = nxt_unit_create_port(&new_ctx->ctx); in nxt_unit_ctx_alloc()
5139 queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t)); in nxt_unit_ctx_alloc()
5147 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, in nxt_unit_ctx_alloc()
5158 rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd); in nxt_unit_ctx_alloc()
5165 return &new_ctx->ctx; in nxt_unit_ctx_alloc()
5173 nxt_unit_ctx_release(&new_ctx->ctx); in nxt_unit_ctx_alloc()
5188 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); in nxt_unit_ctx_free()
5205 nxt_unit_free(&ctx_impl->ctx, mmap_buf); in nxt_unit_ctx_free()
5218 nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl); in nxt_unit_ctx_free()
5225 nxt_unit_free(&ctx_impl->ctx, rbuf); in nxt_unit_ctx_free()
5243 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl); in nxt_unit_ctx_free()
5273 nxt_unit_create_port(nxt_unit_ctx_t *ctx) in nxt_unit_create_port() argument
5280 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_create_port()
5284 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", in nxt_unit_create_port()
5296 nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno)); in nxt_unit_create_port()
5303 nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno)); in nxt_unit_create_port()
5308 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", in nxt_unit_create_port()
5313 process = nxt_unit_process_get(ctx, lib->pid); in nxt_unit_create_port()
5333 port = nxt_unit_add_port(ctx, &new_port, NULL); in nxt_unit_create_port()
5344 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, in nxt_unit_send_port() argument
5357 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_port()
5376 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &oob); in nxt_unit_send_port()
5432 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) in nxt_unit_add_port() argument
5441 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_add_port()
5448 nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} " in nxt_unit_add_port()
5506 lib->callbacks.add_port(ctx, old_port); in nxt_unit_add_port()
5520 nxt_unit_process_awaiting_req(ctx, &awaiting_req); in nxt_unit_add_port()
5528 nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", in nxt_unit_add_port()
5532 process = nxt_unit_process_get(ctx, port->id.pid); in nxt_unit_add_port()
5543 new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); in nxt_unit_add_port()
5545 nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", in nxt_unit_add_port()
5555 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", in nxt_unit_add_port()
5558 nxt_unit_free(ctx, new_port); in nxt_unit_add_port()
5595 lib->callbacks.add_port(ctx, &new_port->port); in nxt_unit_add_port()
5610 nxt_unit_process_awaiting_req(ctx, &awaiting_req); in nxt_unit_add_port()
5618 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req) in nxt_unit_process_awaiting_req() argument
5628 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, in nxt_unit_process_awaiting_req()
5629 ctx); in nxt_unit_process_awaiting_req()
5640 nxt_unit_awake_ctx(ctx, ctx_impl); in nxt_unit_process_awaiting_req()
5647 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx, in nxt_unit_remove_port() argument
5666 lib->callbacks.remove_port(&lib->unit, ctx, port); in nxt_unit_remove_port()
5755 nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param) in nxt_unit_quit() argument
5769 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_quit()
5770 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_quit()
5772 nxt_unit_debug(ctx, "quit: %d/%d/%d", (int) quit_param, ctx_impl->ready, in nxt_unit_quit()
5788 cb->remove_port(&lib->unit, ctx, lib->shared_port); in nxt_unit_quit()
5810 cb->quit(ctx); in nxt_unit_quit()
5832 nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id); in nxt_unit_quit()
5836 if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) { in nxt_unit_quit()
5850 if (ctx == &ctx_impl->ctx in nxt_unit_quit()
5857 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, in nxt_unit_quit()
5867 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) in nxt_unit_get_port() argument
5878 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_get_port()
5879 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_get_port()
5890 nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, in nxt_unit_get_port()
5893 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL); in nxt_unit_get_port()
5903 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, in nxt_unit_port_send() argument
5913 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_port_send()
5921 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", in nxt_unit_port_send()
5927 nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d", in nxt_unit_port_send()
5937 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, in nxt_unit_port_send()
5940 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", in nxt_unit_port_send()
5945 ret = lib->callbacks.port_send(ctx, port, &msg, in nxt_unit_port_send()
5948 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue", in nxt_unit_port_send()
5963 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", in nxt_unit_port_send()
5969 nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d", in nxt_unit_port_send()
5974 ret = lib->callbacks.port_send(ctx, port, buf, buf_size, in nxt_unit_port_send()
5978 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", in nxt_unit_port_send()
5983 ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, oob); in nxt_unit_port_send()
5985 nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", in nxt_unit_port_send()
5995 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, in nxt_unit_sendmsg() argument
6020 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", in nxt_unit_sendmsg()
6024 nxt_unit_debug(ctx, "sendmsg(%d, %d, %d): %d", fd, (int) buf_size, in nxt_unit_sendmsg()
6033 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, in nxt_unit_ctx_port_recv() argument
6054 nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d", in nxt_unit_ctx_port_recv()
6068 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", in nxt_unit_ctx_port_recv()
6075 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", in nxt_unit_ctx_port_recv()
6087 res = nxt_unit_port_recv(ctx, port, rbuf); in nxt_unit_ctx_port_recv()
6095 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", in nxt_unit_ctx_port_recv()
6101 nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d", in nxt_unit_ctx_port_recv()
6115 nxt_unit_debug(ctx, "port{%d,%d} suspend message %d", in nxt_unit_ctx_port_recv()
6120 port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx); in nxt_unit_ctx_port_recv()
6130 nxt_unit_alert(ctx, "too many port socket messages"); in nxt_unit_ctx_port_recv()
6154 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, in nxt_unit_shared_port_recv() argument
6164 res = nxt_unit_app_queue_recv(ctx, port, rbuf); in nxt_unit_shared_port_recv()
6171 res = nxt_unit_port_recv(ctx, port, rbuf); in nxt_unit_shared_port_recv()
6179 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", in nxt_unit_shared_port_recv()
6191 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, in nxt_unit_port_recv() argument
6199 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_port_recv()
6204 rbuf->size = lib->callbacks.port_recv(ctx, port, in nxt_unit_port_recv()
6208 nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", in nxt_unit_port_recv()
6236 nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", in nxt_unit_port_recv()
6242 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", in nxt_unit_port_recv()
6248 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size); in nxt_unit_port_recv()
6268 nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, in nxt_unit_app_queue_recv() argument
6295 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_app_queue_recv()
6301 nxt_unit_debug(ctx, "request limit reached"); in nxt_unit_app_queue_recv()
6309 (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port, in nxt_unit_app_queue_recv()
6491 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, in nxt_unit_request_hash_add() argument
6515 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_request_hash_add()
6536 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) in nxt_unit_request_hash_find() argument
6549 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); in nxt_unit_request_hash_find()
6580 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) in nxt_unit_log() argument
6588 if (nxt_fast_path(ctx != NULL)) { in nxt_unit_log()
6589 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_log()
6633 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_req_log()
6746 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size) in nxt_unit_malloc() argument
6754 nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p); in nxt_unit_malloc()
6758 nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)", in nxt_unit_malloc()
6767 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p) in nxt_unit_free() argument
6770 nxt_unit_debug(ctx, "free(%p)", p); in nxt_unit_free()