Lines Matching refs:lib

45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
408 nxt_unit_impl_t *lib; member
433 nxt_unit_impl_t *lib; in nxt_unit_init() local
438 lib = nxt_unit_create(init); in nxt_unit_init()
439 if (nxt_slow_path(lib == NULL)) { in nxt_unit_init()
456 lib->log_fd = init->log_fd; in nxt_unit_init()
471 &lib->log_fd, &ready_stream, &shm_limit, in nxt_unit_init()
477 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) in nxt_unit_init()
479 lib->request_limit = request_limit; in nxt_unit_init()
482 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { in nxt_unit_init()
483 lib->shm_mmap_limit = 1; in nxt_unit_init()
486 lib->pid = read_port.id.pid; in nxt_unit_init()
487 nxt_unit_pid = lib->pid; in nxt_unit_init()
489 ctx = &lib->main_ctx.ctx; in nxt_unit_init()
496 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); in nxt_unit_init()
497 if (nxt_slow_path(lib->router_port == NULL)) { in nxt_unit_init()
524 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); in nxt_unit_init()
525 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { in nxt_unit_init()
550 lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem); in nxt_unit_init()
551 if (nxt_slow_path(lib->shared_port == NULL)) { in nxt_unit_init()
579 nxt_unit_ctx_release(&lib->main_ctx.ctx); in nxt_unit_init()
589 nxt_unit_impl_t *lib; in nxt_unit_create() local
597 lib = nxt_unit_malloc(NULL, in nxt_unit_create()
599 if (nxt_slow_path(lib == NULL)) { in nxt_unit_create()
605 rc = pthread_mutex_init(&lib->mutex, NULL); in nxt_unit_create()
612 lib->unit.data = init->data; in nxt_unit_create()
613 lib->callbacks = init->callbacks; in nxt_unit_create()
615 lib->request_data_size = init->request_data_size; in nxt_unit_create()
616 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) in nxt_unit_create()
618 lib->request_limit = init->request_limit; in nxt_unit_create()
620 lib->processes.slot = NULL; in nxt_unit_create()
621 lib->ports.slot = NULL; in nxt_unit_create()
623 lib->log_fd = STDERR_FILENO; in nxt_unit_create()
625 nxt_queue_init(&lib->contexts); in nxt_unit_create()
627 lib->use_count = 0; in nxt_unit_create()
628 lib->request_count = 0; in nxt_unit_create()
629 lib->router_port = NULL; in nxt_unit_create()
630 lib->shared_port = NULL; in nxt_unit_create()
632 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); in nxt_unit_create()
637 rc = nxt_unit_mmaps_init(&lib->incoming); in nxt_unit_create()
644 rc = nxt_unit_mmaps_init(&lib->outgoing); in nxt_unit_create()
651 return lib; in nxt_unit_create()
654 nxt_unit_mmaps_destroy(&lib->incoming); in nxt_unit_create()
657 nxt_unit_ctx_free(&lib->main_ctx); in nxt_unit_create()
660 pthread_mutex_destroy(&lib->mutex); in nxt_unit_create()
663 nxt_unit_free(NULL, lib); in nxt_unit_create()
670 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, in nxt_unit_ctx_init() argument
676 ctx_impl->ctx.unit = &lib->unit; in nxt_unit_ctx_init()
685 nxt_unit_lib_use(lib); in nxt_unit_ctx_init()
687 pthread_mutex_lock(&lib->mutex); in nxt_unit_ctx_init()
689 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); in nxt_unit_ctx_init()
691 pthread_mutex_unlock(&lib->mutex); in nxt_unit_ctx_init()
716 ctx_impl->req.req.unit = &lib->unit; in nxt_unit_ctx_init()
753 nxt_unit_lib_use(nxt_unit_impl_t *lib) in nxt_unit_lib_use() argument
755 nxt_atomic_fetch_add(&lib->use_count, 1); in nxt_unit_lib_use()
760 nxt_unit_lib_release(nxt_unit_impl_t *lib) in nxt_unit_lib_release() argument
765 c = nxt_atomic_fetch_add(&lib->use_count, -1); in nxt_unit_lib_release()
769 pthread_mutex_lock(&lib->mutex); in nxt_unit_lib_release()
771 process = nxt_unit_process_pop_first(lib); in nxt_unit_lib_release()
773 pthread_mutex_unlock(&lib->mutex); in nxt_unit_lib_release()
778 nxt_unit_remove_process(lib, process); in nxt_unit_lib_release()
781 pthread_mutex_destroy(&lib->mutex); in nxt_unit_lib_release()
783 if (nxt_fast_path(lib->router_port != NULL)) { in nxt_unit_lib_release()
784 nxt_unit_port_release(lib->router_port); in nxt_unit_lib_release()
787 if (nxt_fast_path(lib->shared_port != NULL)) { in nxt_unit_lib_release()
788 nxt_unit_port_release(lib->shared_port); in nxt_unit_lib_release()
791 nxt_unit_mmaps_destroy(&lib->incoming); in nxt_unit_lib_release()
792 nxt_unit_mmaps_destroy(&lib->outgoing); in nxt_unit_lib_release()
794 nxt_unit_free(NULL, lib); in nxt_unit_lib_release()
947 nxt_unit_impl_t *lib; in nxt_unit_ready() local
950 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_ready()
953 msg.pid = lib->pid; in nxt_unit_ready()
980 nxt_unit_impl_t *lib; in nxt_unit_process_msg() local
983 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_msg()
1088 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { in nxt_unit_process_msg()
1090 port_msg->stream, recv_msg.fd[0], lib->log_fd, in nxt_unit_process_msg()
1139 nxt_unit_remove_pid(lib, pid); in nxt_unit_process_msg()
1246 nxt_unit_impl_t *lib; in nxt_unit_ctx_ready() local
1257 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_ctx_ready()
1260 if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) { in nxt_unit_ctx_ready()
1261 return lib->callbacks.ready_handler(ctx); in nxt_unit_ctx_ready()
1264 if (&lib->main_ctx != ctx_impl) { in nxt_unit_ctx_ready()
1266 if (nxt_slow_path(!lib->main_ctx.ready)) { in nxt_unit_ctx_ready()
1269 nxt_unit_quit(ctx, lib->main_ctx.quit_param); in nxt_unit_ctx_ready()
1274 if (lib->callbacks.add_port != NULL) { in nxt_unit_ctx_ready()
1275 lib->callbacks.add_port(ctx, lib->shared_port); in nxt_unit_ctx_ready()
1288 nxt_unit_impl_t *lib; in nxt_unit_process_req_headers() local
1378 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_req_headers()
1396 if (lib->callbacks.data_handler == NULL) { in nxt_unit_process_req_headers()
1402 lib->callbacks.request_handler(req); in nxt_unit_process_req_headers()
1417 nxt_unit_impl_t *lib; in nxt_unit_process_req_body() local
1450 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_req_body()
1452 if (lib->callbacks.data_handler != NULL) { in nxt_unit_process_req_body()
1453 lib->callbacks.data_handler(req); in nxt_unit_process_req_body()
1459 lib->callbacks.request_handler(req); in nxt_unit_process_req_body()
1472 nxt_unit_impl_t *lib; in nxt_unit_request_check_response_port() local
1480 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_request_check_response_port()
1483 pthread_mutex_lock(&lib->mutex); in nxt_unit_request_check_response_port()
1485 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); in nxt_unit_request_check_response_port()
1492 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1509 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1521 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1533 res = nxt_unit_port_hash_add(&lib->ports, port); in nxt_unit_request_check_response_port()
1538 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1545 process = nxt_unit_process_find(lib, port_id->pid, 0); in nxt_unit_request_check_response_port()
1550 nxt_unit_port_hash_find(&lib->ports, port_id, 1); in nxt_unit_request_check_response_port()
1552 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1577 pthread_mutex_unlock(&lib->mutex); in nxt_unit_request_check_response_port()
1595 nxt_unit_impl_t *lib; in nxt_unit_send_req_headers_ack() local
1599 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_req_headers_ack()
1606 msg.pid = lib->pid; in nxt_unit_send_req_headers_ack()
1624 nxt_unit_impl_t *lib; in nxt_unit_process_websocket() local
1638 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_websocket()
1639 cb = &lib->callbacks; in nxt_unit_process_websocket()
1729 nxt_unit_impl_t *lib; in nxt_unit_process_shm_ack() local
1732 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_shm_ack()
1733 cb = &lib->callbacks; in nxt_unit_process_shm_ack()
1746 nxt_unit_impl_t *lib; in nxt_unit_request_info_get() local
1753 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_request_info_get()
1761 + lib->request_data_size); in nxt_unit_request_info_get()
1782 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; in nxt_unit_request_info_get()
2642 nxt_unit_impl_t *lib; in nxt_unit_mmap_buf_send() local
2646 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_buf_send()
2655 m.msg.pid = lib->pid; in nxt_unit_mmap_buf_send()
2703 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, in nxt_unit_mmap_buf_send()
2707 (int) lib->outgoing.allocated_chunks); in nxt_unit_mmap_buf_send()
3262 nxt_unit_impl_t *lib; in nxt_unit_request_done() local
3301 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); in nxt_unit_request_done()
3304 msg.pid = lib->pid; in nxt_unit_request_done()
3498 nxt_unit_impl_t *lib; in nxt_unit_mmap_get() local
3501 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_get()
3503 pthread_mutex_lock(&lib->outgoing.mutex); in nxt_unit_mmap_get()
3507 outgoing_size = lib->outgoing.size; in nxt_unit_mmap_get()
3509 mm_end = lib->outgoing.elts + outgoing_size; in nxt_unit_mmap_get()
3511 for (mm = lib->outgoing.elts; mm < mm_end; mm++) { in nxt_unit_mmap_get()
3559 if (outgoing_size >= lib->shm_mmap_limit) { in nxt_unit_mmap_get()
3561 pthread_mutex_unlock(&lib->outgoing.mutex); in nxt_unit_mmap_get()
3567 if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n in nxt_unit_mmap_get()
3568 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) in nxt_unit_mmap_get()
3596 pthread_mutex_lock(&lib->outgoing.mutex); in nxt_unit_mmap_get()
3606 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n); in nxt_unit_mmap_get()
3609 (int) lib->outgoing.allocated_chunks); in nxt_unit_mmap_get()
3611 pthread_mutex_unlock(&lib->outgoing.mutex); in nxt_unit_mmap_get()
3622 nxt_unit_impl_t *lib; in nxt_unit_send_oosm() local
3624 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_oosm()
3627 msg.pid = lib->pid; in nxt_unit_send_oosm()
3635 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); in nxt_unit_send_oosm()
3750 nxt_unit_impl_t *lib; in nxt_unit_new_mmap() local
3753 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_new_mmap()
3755 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size); in nxt_unit_new_mmap()
3783 hdr->id = lib->outgoing.size - 1; in nxt_unit_new_mmap()
3784 hdr->src_pid = lib->pid; in nxt_unit_new_mmap()
3798 pthread_mutex_unlock(&lib->outgoing.mutex); in nxt_unit_new_mmap()
3807 hdr->id, (int) lib->pid, (int) port->id.pid); in nxt_unit_new_mmap()
3812 pthread_mutex_lock(&lib->outgoing.mutex); in nxt_unit_new_mmap()
3820 lib->outgoing.size--; in nxt_unit_new_mmap()
3833 nxt_unit_impl_t *lib; in nxt_unit_shm_open() local
3835 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_shm_open()
3837 lib->pid, (void *) (uintptr_t) pthread_self()); in nxt_unit_shm_open()
3905 nxt_unit_impl_t *lib; in nxt_unit_send_mmap() local
3908 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_mmap()
3911 msg.pid = lib->pid; in nxt_unit_send_mmap()
4006 nxt_unit_impl_t *lib; in nxt_unit_incoming_mmap() local
4011 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_incoming_mmap()
4037 (int) pid, (int) hdr->dst_pid, (int) lib->pid); in nxt_unit_incoming_mmap()
4046 pthread_mutex_lock(&lib->incoming.mutex); in nxt_unit_incoming_mmap()
4048 mm = nxt_unit_mmap_at(&lib->incoming, hdr->id); in nxt_unit_incoming_mmap()
4067 pthread_mutex_unlock(&lib->incoming.mutex); in nxt_unit_incoming_mmap()
4222 nxt_unit_impl_t *lib; in nxt_unit_mmap_read() local
4261 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_read()
4263 mmaps = &lib->incoming; in nxt_unit_mmap_read()
4313 nxt_unit_impl_t *lib; in nxt_unit_get_mmap() local
4321 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_get_mmap()
4326 m.msg.pid = lib->pid; in nxt_unit_get_mmap()
4334 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL); in nxt_unit_get_mmap()
4350 nxt_unit_impl_t *lib; in nxt_unit_mmap_release() local
4367 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_mmap_release()
4369 if (hdr->src_pid == lib->pid && freed_chunks != 0) { in nxt_unit_mmap_release()
4370 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks); in nxt_unit_mmap_release()
4373 (int) lib->outgoing.allocated_chunks); in nxt_unit_mmap_release()
4376 if (hdr->dst_pid == lib->pid in nxt_unit_mmap_release()
4390 nxt_unit_impl_t *lib; in nxt_unit_send_shm_ack() local
4392 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_shm_ack()
4395 msg.pid = lib->pid; in nxt_unit_send_shm_ack()
4403 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); in nxt_unit_send_shm_ack()
4450 nxt_unit_impl_t *lib; in nxt_unit_process_get() local
4454 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_get()
4458 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { in nxt_unit_process_get()
4475 process->lib = lib; in nxt_unit_process_get()
4482 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { in nxt_unit_process_get()
4500 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) in nxt_unit_process_find() argument
4508 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); in nxt_unit_process_find()
4511 rc = nxt_lvlhsh_find(&lib->processes, &lhq); in nxt_unit_process_find()
4527 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) in nxt_unit_process_pop_first() argument
4529 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); in nxt_unit_process_pop_first()
4614 nxt_unit_impl_t *lib; in nxt_unit_read_buf() local
4628 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_read_buf()
4655 res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); in nxt_unit_read_buf()
4660 fds[1].fd = lib->shared_port->in_fd; in nxt_unit_read_buf()
4705 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); in nxt_unit_read_buf()
4724 nxt_unit_impl_t *lib; in nxt_unit_chk_ready() local
4728 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_chk_ready()
4731 && (lib->request_limit == 0 in nxt_unit_chk_ready()
4732 || lib->request_count < lib->request_limit)); in nxt_unit_chk_ready()
4787 nxt_unit_impl_t *lib; in nxt_unit_process_ready_req() local
4812 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); in nxt_unit_process_ready_req()
4839 if (lib->callbacks.data_handler == NULL) { in nxt_unit_process_ready_req()
4844 lib->callbacks.request_handler(&req_impl->req); in nxt_unit_process_ready_req()
4956 nxt_unit_impl_t *lib; in nxt_unit_run_shared() local
4961 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_run_shared()
4974 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); in nxt_unit_run_shared()
5000 nxt_unit_impl_t *lib; in nxt_unit_dequeue_request() local
5006 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_dequeue_request()
5019 rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); in nxt_unit_dequeue_request()
5054 nxt_unit_impl_t *lib; in nxt_unit_process_port_msg_impl() local
5057 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_process_port_msg_impl()
5059 if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) { in nxt_unit_process_port_msg_impl()
5068 if (port == lib->shared_port) { in nxt_unit_process_port_msg_impl()
5108 nxt_unit_impl_t *lib; in nxt_unit_ctx_alloc() local
5113 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_ctx_alloc()
5116 + lib->request_data_size); in nxt_unit_ctx_alloc()
5123 rc = nxt_unit_ctx_init(lib, new_ctx, data); in nxt_unit_ctx_alloc()
5158 rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd); in nxt_unit_ctx_alloc()
5182 nxt_unit_impl_t *lib; in nxt_unit_ctx_free() local
5188 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); in nxt_unit_ctx_free()
5231 pthread_mutex_lock(&lib->mutex); in nxt_unit_ctx_free()
5235 pthread_mutex_unlock(&lib->mutex); in nxt_unit_ctx_free()
5238 nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id); in nxt_unit_ctx_free()
5242 if (ctx_impl != &lib->main_ctx) { in nxt_unit_ctx_free()
5243 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl); in nxt_unit_ctx_free()
5246 nxt_unit_lib_release(lib); in nxt_unit_ctx_free()
5276 nxt_unit_impl_t *lib; in nxt_unit_create_port() local
5280 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_create_port()
5311 pthread_mutex_lock(&lib->mutex); in nxt_unit_create_port()
5313 process = nxt_unit_process_get(ctx, lib->pid); in nxt_unit_create_port()
5315 pthread_mutex_unlock(&lib->mutex); in nxt_unit_create_port()
5323 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); in nxt_unit_create_port()
5329 pthread_mutex_unlock(&lib->mutex); in nxt_unit_create_port()
5349 nxt_unit_impl_t *lib; in nxt_unit_send_port() local
5357 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_send_port()
5360 m.msg.pid = lib->pid; in nxt_unit_send_port()
5436 nxt_unit_impl_t *lib; in nxt_unit_add_port() local
5441 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_add_port()
5443 pthread_mutex_lock(&lib->mutex); in nxt_unit_add_port()
5445 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); in nxt_unit_add_port()
5494 if (lib->callbacks.add_port == NULL && ready) { in nxt_unit_add_port()
5503 pthread_mutex_unlock(&lib->mutex); in nxt_unit_add_port()
5505 if (lib->callbacks.add_port != NULL && ready) { in nxt_unit_add_port()
5506 lib->callbacks.add_port(ctx, old_port); in nxt_unit_add_port()
5508 pthread_mutex_lock(&lib->mutex); in nxt_unit_add_port()
5517 pthread_mutex_unlock(&lib->mutex); in nxt_unit_add_port()
5553 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); in nxt_unit_add_port()
5577 if (lib->callbacks.add_port == NULL) { in nxt_unit_add_port()
5588 pthread_mutex_unlock(&lib->mutex); in nxt_unit_add_port()
5594 if (lib->callbacks.add_port != NULL && new_port != NULL && ready) { in nxt_unit_add_port()
5595 lib->callbacks.add_port(ctx, &new_port->port); in nxt_unit_add_port()
5599 pthread_mutex_lock(&lib->mutex); in nxt_unit_add_port()
5608 pthread_mutex_unlock(&lib->mutex); in nxt_unit_add_port()
5647 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx, in nxt_unit_remove_port() argument
5653 pthread_mutex_lock(&lib->mutex); in nxt_unit_remove_port()
5655 port = nxt_unit_remove_port_unsafe(lib, port_id); in nxt_unit_remove_port()
5663 pthread_mutex_unlock(&lib->mutex); in nxt_unit_remove_port()
5665 if (lib->callbacks.remove_port != NULL && port != NULL) { in nxt_unit_remove_port()
5666 lib->callbacks.remove_port(&lib->unit, ctx, port); in nxt_unit_remove_port()
5676 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) in nxt_unit_remove_port_unsafe() argument
5680 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); in nxt_unit_remove_port_unsafe()
5697 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) in nxt_unit_remove_pid() argument
5701 pthread_mutex_lock(&lib->mutex); in nxt_unit_remove_pid()
5703 process = nxt_unit_process_find(lib, pid, 1); in nxt_unit_remove_pid()
5707 pthread_mutex_unlock(&lib->mutex); in nxt_unit_remove_pid()
5712 nxt_unit_remove_process(lib, process); in nxt_unit_remove_pid()
5714 if (lib->callbacks.remove_pid != NULL) { in nxt_unit_remove_pid()
5715 lib->callbacks.remove_pid(&lib->unit, pid); in nxt_unit_remove_pid()
5721 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) in nxt_unit_remove_process() argument
5732 nxt_unit_remove_port_unsafe(lib, &port->port.id); in nxt_unit_remove_process()
5736 pthread_mutex_unlock(&lib->mutex); in nxt_unit_remove_process()
5742 if (lib->callbacks.remove_port != NULL) { in nxt_unit_remove_process()
5743 lib->callbacks.remove_port(&lib->unit, NULL, &port->port); in nxt_unit_remove_process()
5758 nxt_unit_impl_t *lib; in nxt_unit_quit() local
5769 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_quit()
5782 cb = &lib->callbacks; in nxt_unit_quit()
5788 cb->remove_port(&lib->unit, ctx, lib->shared_port); in nxt_unit_quit()
5832 nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id); in nxt_unit_quit()
5836 if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) { in nxt_unit_quit()
5842 m.msg.pid = lib->pid; in nxt_unit_quit()
5846 pthread_mutex_lock(&lib->mutex); in nxt_unit_quit()
5848 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { in nxt_unit_quit()
5862 pthread_mutex_unlock(&lib->mutex); in nxt_unit_quit()
5870 nxt_unit_impl_t *lib; in nxt_unit_get_port() local
5878 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_get_port()
5883 m.msg.pid = lib->pid; in nxt_unit_get_port()
5893 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL); in nxt_unit_get_port()
5910 nxt_unit_impl_t *lib; in nxt_unit_port_send() local
5913 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_port_send()
5936 if (lib->callbacks.port_send == NULL) { in nxt_unit_port_send()
5945 ret = lib->callbacks.port_send(ctx, port, &msg, in nxt_unit_port_send()
5973 if (lib->callbacks.port_send != NULL) { in nxt_unit_port_send()
5974 ret = lib->callbacks.port_send(ctx, port, buf, buf_size, in nxt_unit_port_send()
6197 nxt_unit_impl_t *lib; in nxt_unit_port_recv() local
6199 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_port_recv()
6201 if (lib->callbacks.port_recv != NULL) { in nxt_unit_port_recv()
6204 rbuf->size = lib->callbacks.port_recv(ctx, port, in nxt_unit_port_recv()
6274 nxt_unit_impl_t *lib; in nxt_unit_app_queue_recv() local
6295 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_app_queue_recv()
6297 if (lib->request_limit != 0) { in nxt_unit_app_queue_recv()
6298 nxt_atomic_fetch_add(&lib->request_count, 1); in nxt_unit_app_queue_recv()
6300 if (nxt_slow_path(lib->request_count >= lib->request_limit)) { in nxt_unit_app_queue_recv()
6305 m.msg.pid = lib->pid; in nxt_unit_app_queue_recv()
6309 (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port, in nxt_unit_app_queue_recv()
6586 nxt_unit_impl_t *lib; in nxt_unit_log() local
6589 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_log()
6591 pid = lib->pid; in nxt_unit_log()
6592 log_fd = lib->log_fd; in nxt_unit_log()
6629 nxt_unit_impl_t *lib; in nxt_unit_req_log() local
6633 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); in nxt_unit_req_log()
6635 pid = lib->pid; in nxt_unit_req_log()
6636 log_fd = lib->log_fd; in nxt_unit_req_log()