Lines Matching refs:lib

45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
408 nxt_unit_impl_t *lib; member
433 nxt_unit_impl_t *lib; in nxt_unit_init() local
438 lib = nxt_unit_create(init); in nxt_unit_init()
439 if (nxt_slow_path(lib == NULL)) { in nxt_unit_init()
456 lib->log_fd = init->log_fd; in nxt_unit_init()
471 &lib->log_fd, &ready_stream, &shm_limit, in nxt_unit_init()
477 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) in nxt_unit_init()
479 lib->request_limit = request_limit; in nxt_unit_init()
482 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { in nxt_unit_init()
483 lib->shm_mmap_limit = 1; in nxt_unit_init()
486 lib->pid = read_port.id.pid; in nxt_unit_init()
487 nxt_unit_pid = lib->pid; in nxt_unit_init()
489 ctx = &lib->main_ctx.ctx; in nxt_unit_init()
496 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); in nxt_unit_init()
497 if (nxt_slow_path(lib->router_port == NULL)) { in nxt_unit_init()
524 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); in nxt_unit_init()
525 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { in nxt_unit_init()
550 lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem); in nxt_unit_init()
551 if (nxt_slow_path(lib->shared_port == NULL)) { in nxt_unit_init()
579 nxt_unit_ctx_release(&lib->main_ctx.ctx); in nxt_unit_init()
589 nxt_unit_impl_t *lib; in nxt_unit_create() local
597 lib = nxt_unit_malloc(NULL, in nxt_unit_create()
599 if (nxt_slow_path(lib == NULL)) { in nxt_unit_create()
605 rc = pthread_mutex_init(&lib->mutex, NULL); in nxt_unit_create()
612 lib->unit.data = init->data; in nxt_unit_create()
613 lib->callbacks = init->callbacks; in nxt_unit_create()
615 lib->request_data_size = init->request_data_size; in nxt_unit_create()
616 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) in nxt_unit_create()
618 lib->request_limit = init->request_limit; in nxt_unit_create()
620 lib->processes.slot = NULL; in nxt_unit_create()
621 lib->ports.slot = NULL; in nxt_unit_create()
623 lib->log_fd = STDERR_FILENO; in nxt_unit_create()
625 nxt_queue_init(&lib->contexts); in nxt_unit_create()
627 lib->use_count = 0; in nxt_unit_create()
628 lib->request_count = 0; in nxt_unit_create()
629 lib->router_port = NULL; in nxt_unit_create()
630 lib->shared_port = NULL; in nxt_unit_create()
632 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); in nxt_unit_create()
637 rc = nxt_unit_mmaps_init(&lib->incoming); in nxt_unit_create()
644 rc = nxt_unit_mmaps_init(&lib->outgoing); in nxt_unit_create()
651 return lib; in nxt_unit_create()
654 nxt_unit_mmaps_destroy(&lib->incoming); in nxt_unit_create()
657 nxt_unit_ctx_free(&lib->main_ctx); in nxt_unit_create()
660 pthread_mutex_destroy(&lib->mutex); in nxt_unit_create()
663 nxt_unit_free(NULL, lib); in nxt_unit_create()
670 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, in nxt_unit_ctx_init() argument
676 ctx_impl->ctx.unit = &lib->unit; in nxt_unit_ctx_init()
685 nxt_unit_lib_use(lib); in nxt_unit_ctx_init()
687 pthread_mutex_lock(&lib->mutex); in nxt_unit_ctx_init()
689 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); in nxt_unit_ctx_init()
691 pthread_mutex_unlock(&lib->mutex); in nxt_unit_ctx_init()
716 ctx_impl->req.req.unit = &lib->unit; in nxt_unit_ctx_init()
753 nxt_unit_lib_use(nxt_unit_impl_t *lib) in nxt_unit_lib_use() argument
755 nxt_atomic_fetch_add(&lib->use_count, 1); in nxt_unit_lib_use()
760 nxt_unit_lib_release(nxt_unit_impl_t *lib) in nxt_unit_lib_release() argument
765 c = nxt_atomic_fetch_add(&lib->use_count, -1); in nxt_unit_lib_release()
769 pthread_mutex_lock(&lib->mutex); in nxt_unit_lib_release()
771 process = nxt_unit_process_pop_first(lib); in nxt_unit_lib_release()
773 pthread_mutex_unlock(&lib->mutex); in nxt_unit_lib_release()
778 nxt_unit_remove_process(lib, process); in nxt_unit_lib_release()
781 pthread_mutex_destroy(&lib->mutex); in nxt_unit_lib_release()
783 if (nxt_fast_path(lib->router_port != NULL)) { in nxt_unit_lib_release()
784 nxt_unit_port_release(lib->router_port); in nxt_unit_lib_release()
787 if (nxt_fast_path(lib->shared_port != NULL)) { in nxt_unit_lib_release()
788 nxt_unit_port_release(lib->shared_port); in nxt_unit_lib_release()
791 nxt_unit_mmaps_destroy(&lib->incoming); in nxt_unit_lib_release()
792 nxt_unit_mmaps_destroy(&lib->outgoing); in nxt_unit_lib_release()
794 nxt_unit_free(NULL, lib); in nxt_unit_lib_release()
947 nxt_unit_impl_t *lib; in nxt_unit_ready() local
950 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_ready()
953 msg.pid = lib->pid; in nxt_unit_ready()
980 nxt_unit_impl_t *lib; in nxt_unit_process_msg() local
983 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_msg()
1088 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { in nxt_unit_process_msg()
1090 port_msg->stream, recv_msg.fd[0], lib->log_fd, in nxt_unit_process_msg()
1139 nxt_unit_remove_pid(lib, pid); in nxt_unit_process_msg()
1246 nxt_unit_impl_t *lib; in nxt_unit_ctx_ready() local
1257 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_ctx_ready()
1260 if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) { in nxt_unit_ctx_ready()
1261 return lib->callbacks.ready_handler(ctx); in nxt_unit_ctx_ready()
1264 if (&lib->main_ctx != ctx_impl) { in nxt_unit_ctx_ready()
1266 if (nxt_slow_path(!lib->main_ctx.ready)) { in nxt_unit_ctx_ready()
1269 nxt_unit_quit(ctx, lib->main_ctx.quit_param); in nxt_unit_ctx_ready()
1274 if (lib->callbacks.add_port != NULL) { in nxt_unit_ctx_ready()
1275 lib->callbacks.add_port(ctx, lib->shared_port); in nxt_unit_ctx_ready()
1288 nxt_unit_impl_t *lib; in nxt_unit_process_req_headers() local
1378 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_req_headers()
1396 if (lib->callbacks.data_handler == NULL) { in nxt_unit_process_req_headers()
1402 lib->callbacks.request_handler(req); in nxt_unit_process_req_headers()
1417 nxt_unit_impl_t *lib; in nxt_unit_process_req_body() local
1450 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_req_body()
1452 if (lib->callbacks.data_handler != NULL) { in nxt_unit_process_req_body()
1453 lib->callbacks.data_handler(req); in nxt_unit_process_req_body()
1459 lib->callbacks.request_handler(req); in nxt_unit_process_req_body()
1472 nxt_unit_impl_t *lib; in nxt_unit_request_check_response_port() local
1480 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_request_check_response_port()
1483 pthread_mutex_lock(&lib->mutex); in nxt_unit_request_check_response_port()
1485 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); in nxt_unit_request_check_response_port()
1492 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1509 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1521 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1533 res = nxt_unit_port_hash_add(&lib->ports, port); in nxt_unit_request_check_response_port()
1538 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1545 process = nxt_unit_process_find(lib, port_id->pid, 0); in nxt_unit_request_check_response_port()
1550 nxt_unit_port_hash_find(&lib->ports, port_id, 1); in nxt_unit_request_check_response_port()
1552 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1577 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1595 nxt_unit_impl_t *lib; in nxt_unit_send_req_headers_ack() local
1599 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_req_headers_ack()
1606 msg.pid = lib->pid; in nxt_unit_send_req_headers_ack()
1624 nxt_unit_impl_t *lib; in nxt_unit_process_websocket() local
1638 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_websocket()
1639 cb = &lib->callbacks; in nxt_unit_process_websocket()
1729 nxt_unit_impl_t *lib; in nxt_unit_process_shm_ack() local
1732 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_shm_ack()
1733 cb = &lib->callbacks; in nxt_unit_process_shm_ack()
1746 nxt_unit_impl_t *lib; in nxt_unit_request_info_get() local
1753 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_request_info_get()
1761 + lib->request_data_size); in nxt_unit_request_info_get()
1782 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; in nxt_unit_request_info_get()
2641 nxt_unit_impl_t *lib; in nxt_unit_mmap_buf_send() local
2645 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_buf_send()
2654 m.msg.pid = lib->pid; in nxt_unit_mmap_buf_send()
2702 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, in nxt_unit_mmap_buf_send()
2706 (int) lib->outgoing.allocated_chunks); in nxt_unit_mmap_buf_send()
3261 nxt_unit_impl_t *lib; in nxt_unit_request_done() local
3300 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); in nxt_unit_request_done()
3303 msg.pid = lib->pid; in nxt_unit_request_done()
3497 nxt_unit_impl_t *lib; in nxt_unit_mmap_get() local
3500 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_get()
3502 pthread_mutex_lock(&lib->outgoing.mutex); in nxt_unit_mmap_get()
3506 outgoing_size = lib->outgoing.size; in nxt_unit_mmap_get()
3508 mm_end = lib->outgoing.elts + outgoing_size; in nxt_unit_mmap_get()
3510 for (mm = lib->outgoing.elts; mm < mm_end; mm++) { in nxt_unit_mmap_get()
3558 if (outgoing_size >= lib->shm_mmap_limit) { in nxt_unit_mmap_get()
3560 pthread_mutex_unlock(&lib->outgoing.mutex); in nxt_unit_mmap_get()
3566 if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n in nxt_unit_mmap_get()
3567 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) in nxt_unit_mmap_get()
3595 pthread_mutex_lock(&lib->outgoing.mutex); in nxt_unit_mmap_get()
3605 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n); in nxt_unit_mmap_get()
3608 (int) lib->outgoing.allocated_chunks); in nxt_unit_mmap_get()
3610 pthread_mutex_unlock(&lib->outgoing.mutex); in nxt_unit_mmap_get()
3621 nxt_unit_impl_t *lib; in nxt_unit_send_oosm() local
3623 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_oosm()
3626 msg.pid = lib->pid; in nxt_unit_send_oosm()
3634 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); in nxt_unit_send_oosm()
3749 nxt_unit_impl_t *lib; in nxt_unit_new_mmap() local
3752 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_new_mmap()
3754 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size); in nxt_unit_new_mmap()
3782 hdr->id = lib->outgoing.size - 1; in nxt_unit_new_mmap()
3783 hdr->src_pid = lib->pid; in nxt_unit_new_mmap()
3797 pthread_mutex_unlock(&lib->outgoing.mutex); in nxt_unit_new_mmap()
3806 hdr->id, (int) lib->pid, (int) port->id.pid); in nxt_unit_new_mmap()
3811 pthread_mutex_lock(&lib->outgoing.mutex); in nxt_unit_new_mmap()
3819 lib->outgoing.size--; in nxt_unit_new_mmap()
3832 nxt_unit_impl_t *lib; in nxt_unit_shm_open() local
3834 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_shm_open()
3836 lib->pid, (void *) (uintptr_t) pthread_self()); in nxt_unit_shm_open()
3904 nxt_unit_impl_t *lib; in nxt_unit_send_mmap() local
3907 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_mmap()
3910 msg.pid = lib->pid; in nxt_unit_send_mmap()
4005 nxt_unit_impl_t *lib; in nxt_unit_incoming_mmap() local
4010 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_incoming_mmap()
4036 (int) pid, (int) hdr->dst_pid, (int) lib->pid); in nxt_unit_incoming_mmap()
4045 pthread_mutex_lock(&lib->incoming.mutex); in nxt_unit_incoming_mmap()
4047 mm = nxt_unit_mmap_at(&lib->incoming, hdr->id); in nxt_unit_incoming_mmap()
4066 pthread_mutex_unlock(&lib->incoming.mutex); in nxt_unit_incoming_mmap()
4221 nxt_unit_impl_t *lib; in nxt_unit_mmap_read() local
4260 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_read()
4262 mmaps = &lib->incoming; in nxt_unit_mmap_read()
4312 nxt_unit_impl_t *lib; in nxt_unit_get_mmap() local
4320 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_get_mmap()
4325 m.msg.pid = lib->pid; in nxt_unit_get_mmap()
4333 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL); in nxt_unit_get_mmap()
4349 nxt_unit_impl_t *lib; in nxt_unit_mmap_release() local
4366 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_release()
4368 if (hdr->src_pid == lib->pid && freed_chunks != 0) { in nxt_unit_mmap_release()
4369 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks); in nxt_unit_mmap_release()
4372 (int) lib->outgoing.allocated_chunks); in nxt_unit_mmap_release()
4375 if (hdr->dst_pid == lib->pid in nxt_unit_mmap_release()
4389 nxt_unit_impl_t *lib; in nxt_unit_send_shm_ack() local
4391 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_shm_ack()
4394 msg.pid = lib->pid; in nxt_unit_send_shm_ack()
4402 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); in nxt_unit_send_shm_ack()
4449 nxt_unit_impl_t *lib; in nxt_unit_process_get() local
4453 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_get()
4457 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { in nxt_unit_process_get()
4474 process->lib = lib; in nxt_unit_process_get()
4481 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { in nxt_unit_process_get()
4499 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) in nxt_unit_process_find() argument
4507 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); in nxt_unit_process_find()
4510 rc = nxt_lvlhsh_find(&lib->processes, &lhq); in nxt_unit_process_find()
4526 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) in nxt_unit_process_pop_first() argument
4528 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); in nxt_unit_process_pop_first()
4613 nxt_unit_impl_t *lib; in nxt_unit_read_buf() local
4627 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_read_buf()
4654 res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); in nxt_unit_read_buf()
4659 fds[1].fd = lib->shared_port->in_fd; in nxt_unit_read_buf()
4704 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); in nxt_unit_read_buf()
4723 nxt_unit_impl_t *lib; in nxt_unit_chk_ready() local
4727 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_chk_ready()
4730 && (lib->request_limit == 0 in nxt_unit_chk_ready()
4731 || lib->request_count < lib->request_limit)); in nxt_unit_chk_ready()
4786 nxt_unit_impl_t *lib; in nxt_unit_process_ready_req() local
4811 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); in nxt_unit_process_ready_req()
4838 if (lib->callbacks.data_handler == NULL) { in nxt_unit_process_ready_req()
4843 lib->callbacks.request_handler(&req_impl->req); in nxt_unit_process_ready_req()
4955 nxt_unit_impl_t *lib; in nxt_unit_run_shared() local
4960 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_run_shared()
4973 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); in nxt_unit_run_shared()
4999 nxt_unit_impl_t *lib; in nxt_unit_dequeue_request() local
5005 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_dequeue_request()
5018 rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); in nxt_unit_dequeue_request()
5053 nxt_unit_impl_t *lib; in nxt_unit_process_port_msg_impl() local
5056 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_port_msg_impl()
5058 if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) { in nxt_unit_process_port_msg_impl()
5067 if (port == lib->shared_port) { in nxt_unit_process_port_msg_impl()
5107 nxt_unit_impl_t *lib; in nxt_unit_ctx_alloc() local
5112 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_ctx_alloc()
5115 + lib->request_data_size); in nxt_unit_ctx_alloc()
5122 rc = nxt_unit_ctx_init(lib, new_ctx, data); in nxt_unit_ctx_alloc()
5157 rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd); in nxt_unit_ctx_alloc()
5181 nxt_unit_impl_t *lib; in nxt_unit_ctx_free() local
5187 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); in nxt_unit_ctx_free()
5230 pthread_mutex_lock(&lib->mutex); in nxt_unit_ctx_free()
5234 pthread_mutex_unlock(&lib->mutex); in nxt_unit_ctx_free()
5237 nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id); in nxt_unit_ctx_free()
5241 if (ctx_impl != &lib->main_ctx) { in nxt_unit_ctx_free()
5242 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl); in nxt_unit_ctx_free()
5245 nxt_unit_lib_release(lib); in nxt_unit_ctx_free()
5275 nxt_unit_impl_t *lib; in nxt_unit_create_port() local
5279 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_create_port()
5310 pthread_mutex_lock(&lib->mutex); in nxt_unit_create_port()
5312 process = nxt_unit_process_get(ctx, lib->pid); in nxt_unit_create_port()
5314 pthread_mutex_unlock(&lib->mutex); in nxt_unit_create_port()
5322 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); in nxt_unit_create_port()
5328 pthread_mutex_unlock(&lib->mutex); in nxt_unit_create_port()
5348 nxt_unit_impl_t *lib; in nxt_unit_send_port() local
5356 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_port()
5359 m.msg.pid = lib->pid; in nxt_unit_send_port()
5435 nxt_unit_impl_t *lib; in nxt_unit_add_port() local
5440 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_add_port()
5442 pthread_mutex_lock(&lib->mutex); in nxt_unit_add_port()
5444 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); in nxt_unit_add_port()
5493 if (lib->callbacks.add_port == NULL && ready) { in nxt_unit_add_port()
5502 pthread_mutex_unlock(&lib->mutex); in nxt_unit_add_port()
5504 if (lib->callbacks.add_port != NULL && ready) { in nxt_unit_add_port()
5505 lib->callbacks.add_port(ctx, old_port); in nxt_unit_add_port()
5507 pthread_mutex_lock(&lib->mutex); in nxt_unit_add_port()
5516 pthread_mutex_unlock(&lib->mutex); in nxt_unit_add_port()
5552 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); in nxt_unit_add_port()
5576 if (lib->callbacks.add_port == NULL) { in nxt_unit_add_port()
5587 pthread_mutex_unlock(&lib->mutex); in nxt_unit_add_port()
5593 if (lib->callbacks.add_port != NULL && new_port != NULL && ready) { in nxt_unit_add_port()
5594 lib->callbacks.add_port(ctx, &new_port->port); in nxt_unit_add_port()
5598 pthread_mutex_lock(&lib->mutex); in nxt_unit_add_port()
5607 pthread_mutex_unlock(&lib->mutex); in nxt_unit_add_port()
5646 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx, in nxt_unit_remove_port() argument
5652 pthread_mutex_lock(&lib->mutex); in nxt_unit_remove_port()
5654 port = nxt_unit_remove_port_unsafe(lib, port_id); in nxt_unit_remove_port()
5662 pthread_mutex_unlock(&lib->mutex); in nxt_unit_remove_port()
5664 if (lib->callbacks.remove_port != NULL && port != NULL) { in nxt_unit_remove_port()
5665 lib->callbacks.remove_port(&lib->unit, ctx, port); in nxt_unit_remove_port()
5675 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) in nxt_unit_remove_port_unsafe() argument
5679 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); in nxt_unit_remove_port_unsafe()
5696 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) in nxt_unit_remove_pid() argument
5700 pthread_mutex_lock(&lib->mutex); in nxt_unit_remove_pid()
5702 process = nxt_unit_process_find(lib, pid, 1); in nxt_unit_remove_pid()
5706 pthread_mutex_unlock(&lib->mutex); in nxt_unit_remove_pid()
5711 nxt_unit_remove_process(lib, process); in nxt_unit_remove_pid()
5713 if (lib->callbacks.remove_pid != NULL) { in nxt_unit_remove_pid()
5714 lib->callbacks.remove_pid(&lib->unit, pid); in nxt_unit_remove_pid()
5720 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) in nxt_unit_remove_process() argument
5731 nxt_unit_remove_port_unsafe(lib, &port->port.id); in nxt_unit_remove_process()
5735 pthread_mutex_unlock(&lib->mutex); in nxt_unit_remove_process()
5741 if (lib->callbacks.remove_port != NULL) { in nxt_unit_remove_process()
5742 lib->callbacks.remove_port(&lib->unit, NULL, &port->port); in nxt_unit_remove_process()
5757 nxt_unit_impl_t *lib; in nxt_unit_quit() local
5768 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_quit()
5781 cb = &lib->callbacks; in nxt_unit_quit()
5787 cb->remove_port(&lib->unit, ctx, lib->shared_port); in nxt_unit_quit()
5831 nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id); in nxt_unit_quit()
5835 if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) { in nxt_unit_quit()
5841 m.msg.pid = lib->pid; in nxt_unit_quit()
5845 pthread_mutex_lock(&lib->mutex); in nxt_unit_quit()
5847 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { in nxt_unit_quit()
5861 pthread_mutex_unlock(&lib->mutex); in nxt_unit_quit()
5869 nxt_unit_impl_t *lib; in nxt_unit_get_port() local
5877 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_get_port()
5882 m.msg.pid = lib->pid; in nxt_unit_get_port()
5892 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL); in nxt_unit_get_port()
5909 nxt_unit_impl_t *lib; in nxt_unit_port_send() local
5912 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_port_send()
5935 if (lib->callbacks.port_send == NULL) { in nxt_unit_port_send()
5944 ret = lib->callbacks.port_send(ctx, port, &msg, in nxt_unit_port_send()
5972 if (lib->callbacks.port_send != NULL) { in nxt_unit_port_send()
5973 ret = lib->callbacks.port_send(ctx, port, buf, buf_size, in nxt_unit_port_send()
6196 nxt_unit_impl_t *lib; in nxt_unit_port_recv() local
6198 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_port_recv()
6200 if (lib->callbacks.port_recv != NULL) { in nxt_unit_port_recv()
6203 rbuf->size = lib->callbacks.port_recv(ctx, port, in nxt_unit_port_recv()
6273 nxt_unit_impl_t *lib; in nxt_unit_app_queue_recv() local
6294 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_app_queue_recv()
6296 if (lib->request_limit != 0) { in nxt_unit_app_queue_recv()
6297 nxt_atomic_fetch_add(&lib->request_count, 1); in nxt_unit_app_queue_recv()
6299 if (nxt_slow_path(lib->request_count >= lib->request_limit)) { in nxt_unit_app_queue_recv()
6304 m.msg.pid = lib->pid; in nxt_unit_app_queue_recv()
6308 (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port, in nxt_unit_app_queue_recv()
6585 nxt_unit_impl_t *lib; in nxt_unit_log() local
6588 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_log()
6590 pid = lib->pid; in nxt_unit_log()
6591 log_fd = lib->log_fd; in nxt_unit_log()
6628 nxt_unit_impl_t *lib; in nxt_unit_req_log() local
6632 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_req_log()
6634 pid = lib->pid; in nxt_unit_req_log()
6635 log_fd = lib->log_fd; in nxt_unit_req_log()