xref: /unit/src/nxt_process.c (revision 1210:973269f705ba)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_main_process.h>
9 
10 #if (NXT_HAVE_CLONE)
11 #include <nxt_clone.h>
12 #endif
13 
14 #include <signal.h>
15 
16 static void nxt_process_start(nxt_task_t *task, nxt_process_t *process);
17 static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc);
18 static nxt_int_t nxt_process_worker_setup(nxt_task_t *task,
19     nxt_process_t *process, int parentfd);
20 
21 /* A cached process pid. */
22 nxt_pid_t  nxt_pid;
23 
24 /* An original parent process pid. */
25 nxt_pid_t  nxt_ppid;
26 
27 nxt_bool_t  nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
28     { 1, 1, 1, 1, 1 },
29     { 1, 0, 0, 0, 0 },
30     { 1, 0, 0, 1, 0 },
31     { 1, 0, 1, 0, 1 },
32     { 1, 0, 0, 0, 0 },
33 };
34 
35 nxt_bool_t  nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
36     { 0, 0, 0, 0, 0 },
37     { 0, 0, 0, 0, 0 },
38     { 0, 0, 0, 1, 0 },
39     { 0, 0, 1, 0, 1 },
40     { 0, 0, 0, 1, 0 },
41 };
42 
43 
44 static nxt_int_t
45 nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd)
46 {
47     pid_t               rpid, pid;
48     ssize_t             n;
49     nxt_int_t           parent_status;
50     nxt_process_t       *p;
51     nxt_runtime_t       *rt;
52     nxt_process_init_t  *init;
53     nxt_process_type_t  ptype;
54 
55     pid  = getpid();
56     rpid = 0;
57     rt   = task->thread->runtime;
58     init = process->init;
59 
60     /* Setup the worker process. */
61 
62     n = read(parentfd, &rpid, sizeof(rpid));
63     if (nxt_slow_path(n == -1 || n != sizeof(rpid))) {
64         nxt_alert(task, "failed to read real pid");
65         return NXT_ERROR;
66     }
67 
68     if (nxt_slow_path(rpid == 0)) {
69         nxt_alert(task, "failed to get real pid from parent");
70         return NXT_ERROR;
71     }
72 
73     nxt_pid = rpid;
74 
75     /* Clean inherited cached thread tid. */
76     task->thread->tid = 0;
77 
78     process->pid = nxt_pid;
79 
80     if (nxt_pid != pid) {
81         nxt_debug(task, "app \"%s\" real pid %d", init->name, nxt_pid);
82         nxt_debug(task, "app \"%s\" isolated pid: %d", init->name, pid);
83     }
84 
85     n = read(parentfd, &parent_status, sizeof(parent_status));
86     if (nxt_slow_path(n == -1 || n != sizeof(parent_status))) {
87         nxt_alert(task, "failed to read parent status");
88         return NXT_ERROR;
89     }
90 
91     if (nxt_slow_path(parent_status != NXT_OK)) {
92         return parent_status;
93     }
94 
95     ptype = init->type;
96 
97     nxt_port_reset_next_id();
98 
99     nxt_event_engine_thread_adopt(task->thread->engine);
100 
101     /* Remove not ready processes. */
102     nxt_runtime_process_each(rt, p) {
103 
104         if (nxt_proc_conn_matrix[ptype][nxt_process_type(p)] == 0) {
105             nxt_debug(task, "remove not required process %PI", p->pid);
106 
107             nxt_process_close_ports(task, p);
108 
109             continue;
110         }
111 
112         if (!p->ready) {
113             nxt_debug(task, "remove not ready process %PI", p->pid);
114 
115             nxt_process_close_ports(task, p);
116 
117             continue;
118         }
119 
120         nxt_port_mmaps_destroy(&p->incoming, 0);
121         nxt_port_mmaps_destroy(&p->outgoing, 0);
122 
123     } nxt_runtime_process_loop;
124 
125     nxt_runtime_process_add(task, process);
126 
127     nxt_process_start(task, process);
128 
129     process->ready = 1;
130 
131     return NXT_OK;
132 }
133 
134 
135 nxt_pid_t
136 nxt_process_create(nxt_task_t *task, nxt_process_t *process)
137 {
138     int                 pipefd[2];
139     nxt_int_t           ret;
140     nxt_pid_t           pid;
141     nxt_process_init_t  *init;
142 
143     if (nxt_slow_path(pipe(pipefd) == -1)) {
144         nxt_alert(task, "failed to create process pipe for passing rpid");
145         return -1;
146     }
147 
148     init = process->init;
149 
150 #if (NXT_HAVE_CLONE)
151     pid = nxt_clone(SIGCHLD | init->isolation.clone.flags);
152     if (nxt_slow_path(pid < 0)) {
153         nxt_alert(task, "clone() failed while creating \"%s\" %E",
154                   init->name, nxt_errno);
155         goto cleanup;
156     }
157 #else
158     pid = fork();
159     if (nxt_slow_path(pid < 0)) {
160         nxt_alert(task, "fork() failed while creating \"%s\" %E",
161                   init->name, nxt_errno);
162         goto cleanup;
163     }
164 #endif
165 
166     if (pid == 0) {
167         /* Child. */
168 
169         if (nxt_slow_path(close(pipefd[1]) == -1)) {
170             nxt_alert(task, "failed to close writer pipe fd");
171         }
172 
173         ret = nxt_process_worker_setup(task, process, pipefd[0]);
174         if (nxt_slow_path(ret != NXT_OK)) {
175             exit(1);
176         }
177 
178         if (nxt_slow_path(close(pipefd[0]) == -1)) {
179             nxt_alert(task, "failed to close writer pipe fd");
180         }
181 
182         /*
183          * Explicitly return 0 to notice the caller function this is the child.
184          * The caller must return to the event engine work queue loop.
185          */
186         return 0;
187     }
188 
189     /* Parent. */
190 
191     /*
192      * At this point, the child process is blocked reading the
193      * pipe fd to get its real pid (rpid).
194      *
195      * If anything goes wrong now, we need to terminate the child
196      * process by sending a NXT_ERROR in the pipe.
197      */
198 
199 #if (NXT_HAVE_CLONE)
200     nxt_debug(task, "clone(\"%s\"): %PI", init->name, pid);
201 #else
202     nxt_debug(task, "fork(\"%s\"): %PI", init->name, pid);
203 #endif
204 
205     if (nxt_slow_path(write(pipefd[1], &pid, sizeof(pid)) == -1)) {
206         nxt_alert(task, "failed to write real pid");
207         goto fail;
208     }
209 
210 #if (NXT_HAVE_CLONE_NEWUSER)
211     if ((init->isolation.clone.flags & CLONE_NEWUSER) == CLONE_NEWUSER) {
212         ret = nxt_clone_proc_map(task, pid, &init->isolation.clone);
213         if (nxt_slow_path(ret != NXT_OK)) {
214             goto fail;
215         }
216     }
217 #endif
218 
219     ret = NXT_OK;
220 
221     if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) {
222         nxt_alert(task, "failed to write status");
223         goto fail;
224     }
225 
226     process->pid = pid;
227 
228     nxt_runtime_process_add(task, process);
229 
230     goto cleanup;
231 
232 fail:
233 
234     ret = NXT_ERROR;
235 
236     if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) {
237         nxt_alert(task, "failed to write status");
238     }
239 
240     waitpid(pid, NULL, 0);
241 
242     pid = -1;
243 
244 cleanup:
245 
246     if (nxt_slow_path(close(pipefd[0]) != 0)) {
247         nxt_alert(task, "failed to close pipe: %E", nxt_errno);
248     }
249 
250     if (nxt_slow_path(close(pipefd[1]) != 0)) {
251         nxt_alert(task, "failed to close pipe: %E", nxt_errno);
252     }
253 
254     return pid;
255 }
256 
257 
258 static void
259 nxt_process_start(nxt_task_t *task, nxt_process_t *process)
260 {
261     nxt_int_t                    ret;
262     nxt_port_t                   *port, *main_port;
263     nxt_thread_t                 *thread;
264     nxt_runtime_t                *rt;
265     nxt_process_init_t           *init;
266     nxt_event_engine_t           *engine;
267     const nxt_event_interface_t  *interface;
268 
269     init = process->init;
270 
271     nxt_log(task, NXT_LOG_INFO, "%s started", init->name);
272 
273     nxt_process_title(task, "unit: %s", init->name);
274 
275     thread = task->thread;
276     rt     = thread->runtime;
277 
278     nxt_random_init(&thread->random);
279 
280     if (rt->capabilities.setid && init->user_cred != NULL) {
281         ret = nxt_user_cred_set(task, init->user_cred);
282         if (ret != NXT_OK) {
283             goto fail;
284         }
285     }
286 
287     rt->type = init->type;
288 
289     engine = thread->engine;
290 
291     /* Update inherited main process event engine and signals processing. */
292     engine->signals->sigev = init->signals;
293 
294     interface = nxt_service_get(rt->services, "engine", rt->engine);
295     if (nxt_slow_path(interface == NULL)) {
296         goto fail;
297     }
298 
299     if (nxt_event_engine_change(engine, interface, rt->batch) != NXT_OK) {
300         goto fail;
301     }
302 
303     ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads,
304                                          60000 * 1000000LL);
305     if (nxt_slow_path(ret != NXT_OK)) {
306         goto fail;
307     }
308 
309     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
310 
311     nxt_port_read_close(main_port);
312     nxt_port_write_enable(task, main_port);
313 
314     port = nxt_process_port_first(process);
315 
316     nxt_port_write_close(port);
317 
318     ret = init->start(task, init->data);
319 
320     if (nxt_slow_path(ret != NXT_OK)) {
321         goto fail;
322     }
323 
324     nxt_port_enable(task, port, init->port_handlers);
325 
326     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY,
327                                 -1, init->stream, 0, NULL);
328 
329     if (nxt_slow_path(ret != NXT_OK)) {
330         nxt_log(task, NXT_LOG_ERR, "failed to send READY message to main");
331 
332         goto fail;
333     }
334 
335     return;
336 
337 fail:
338 
339     exit(1);
340 }
341 
342 
343 #if (NXT_HAVE_POSIX_SPAWN)
344 
345 /*
346  * Linux glibc 2.2 posix_spawn() is implemented via fork()/execve().
347  * Linux glibc 2.4 posix_spawn() without file actions and spawn
348  * attributes uses vfork()/execve().
349  *
350  * On FreeBSD 8.0 posix_spawn() is implemented via vfork()/execve().
351  *
352  * Solaris 10:
353  *   In the Solaris 10 OS, posix_spawn() is currently implemented using
354  *   private-to-libc vfork(), execve(), and exit() functions.  They are
355  *   identical to regular vfork(), execve(), and exit() in functionality,
356  *   but they are not exported from libc and therefore don't cause the
357  *   deadlock-in-the-dynamic-linker problem that any multithreaded code
358  *   outside of libc that calls vfork() can cause.
359  *
360  * On MacOSX 10.5 (Leoprad) and NetBSD 6.0 posix_spawn() is implemented
361  * as syscall.
362  */
363 
364 nxt_pid_t
365 nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp)
366 {
367     nxt_pid_t  pid;
368 
369     nxt_debug(task, "posix_spawn(\"%s\")", name);
370 
371     if (posix_spawn(&pid, name, NULL, NULL, argv, envp) != 0) {
372         nxt_alert(task, "posix_spawn(\"%s\") failed %E", name, nxt_errno);
373         return -1;
374     }
375 
376     return pid;
377 }
378 
379 #else
380 
381 nxt_pid_t
382 nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp)
383 {
384     nxt_pid_t  pid;
385 
386     /*
387      * vfork() is better than fork() because:
388      *   it is faster several times;
389      *   its execution time does not depend on private memory mapping size;
390      *   it has lesser chances to fail due to the ENOMEM error.
391      */
392 
393     pid = vfork();
394 
395     switch (pid) {
396 
397     case -1:
398         nxt_alert(task, "vfork() failed while executing \"%s\" %E",
399                   name, nxt_errno);
400         break;
401 
402     case 0:
403         /* A child. */
404         nxt_debug(task, "execve(\"%s\")", name);
405 
406         (void) execve(name, argv, envp);
407 
408         nxt_alert(task, "execve(\"%s\") failed %E", name, nxt_errno);
409 
410         exit(1);
411         nxt_unreachable();
412         break;
413 
414     default:
415         /* A parent. */
416         nxt_debug(task, "vfork(): %PI", pid);
417         break;
418     }
419 
420     return pid;
421 }
422 
423 #endif
424 
425 
426 nxt_int_t
427 nxt_process_daemon(nxt_task_t *task)
428 {
429     nxt_fd_t      fd;
430     nxt_pid_t     pid;
431     const char    *msg;
432 
433     fd = -1;
434 
435     /*
436      * fork() followed by a parent process's exit() detaches a child process
437      * from an init script or terminal shell process which has started the
438      * parent process and allows the child process to run in background.
439      */
440 
441     pid = fork();
442 
443     switch (pid) {
444 
445     case -1:
446         msg = "fork() failed %E";
447         goto fail;
448 
449     case 0:
450         /* A child. */
451         break;
452 
453     default:
454         /* A parent. */
455         nxt_debug(task, "fork(): %PI", pid);
456         exit(0);
457         nxt_unreachable();
458     }
459 
460     nxt_pid = getpid();
461 
462     /* Clean inherited cached thread tid. */
463     task->thread->tid = 0;
464 
465     nxt_debug(task, "daemon");
466 
467     /* Detach from controlling terminal. */
468 
469     if (setsid() == -1) {
470         nxt_alert(task, "setsid() failed %E", nxt_errno);
471         return NXT_ERROR;
472     }
473 
474     /*
475      * Reset file mode creation mask: any access
476      * rights can be set on file creation.
477      */
478     umask(0);
479 
480     /* Redirect STDIN and STDOUT to the "/dev/null". */
481 
482     fd = open("/dev/null", O_RDWR);
483     if (fd == -1) {
484         msg = "open(\"/dev/null\") failed %E";
485         goto fail;
486     }
487 
488     if (dup2(fd, STDIN_FILENO) == -1) {
489         msg = "dup2(\"/dev/null\", STDIN) failed %E";
490         goto fail;
491     }
492 
493     if (dup2(fd, STDOUT_FILENO) == -1) {
494         msg = "dup2(\"/dev/null\", STDOUT) failed %E";
495         goto fail;
496     }
497 
498     if (fd > STDERR_FILENO) {
499         nxt_fd_close(fd);
500     }
501 
502     return NXT_OK;
503 
504 fail:
505 
506     nxt_alert(task, msg, nxt_errno);
507 
508     if (fd != -1) {
509         nxt_fd_close(fd);
510     }
511 
512     return NXT_ERROR;
513 }
514 
515 
516 void
517 nxt_nanosleep(nxt_nsec_t ns)
518 {
519     struct timespec  ts;
520 
521     ts.tv_sec = ns / 1000000000;
522     ts.tv_nsec = ns % 1000000000;
523 
524     (void) nanosleep(&ts, NULL);
525 }
526 
527 
528 nxt_int_t
529 nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, const char *group)
530 {
531     struct group   *grp;
532     struct passwd  *pwd;
533 
534     nxt_errno = 0;
535 
536     pwd = getpwnam(uc->user);
537 
538     if (nxt_slow_path(pwd == NULL)) {
539 
540         if (nxt_errno == 0) {
541             nxt_alert(task, "getpwnam(\"%s\") failed, user \"%s\" not found",
542                       uc->user, uc->user);
543         } else {
544             nxt_alert(task, "getpwnam(\"%s\") failed %E", uc->user, nxt_errno);
545         }
546 
547         return NXT_ERROR;
548     }
549 
550     uc->uid = pwd->pw_uid;
551     uc->base_gid = pwd->pw_gid;
552 
553     if (group != NULL && group[0] != '\0') {
554         nxt_errno = 0;
555 
556         grp = getgrnam(group);
557 
558         if (nxt_slow_path(grp == NULL)) {
559 
560             if (nxt_errno == 0) {
561                 nxt_alert(task,
562                           "getgrnam(\"%s\") failed, group \"%s\" not found",
563                           group, group);
564             } else {
565                 nxt_alert(task, "getgrnam(\"%s\") failed %E", group, nxt_errno);
566             }
567 
568             return NXT_ERROR;
569         }
570 
571         uc->base_gid = grp->gr_gid;
572     }
573 
574     return nxt_user_groups_get(task, uc);
575 }
576 
577 
578 /*
579  * nxt_user_groups_get() stores an array of groups IDs which should be
580  * set by the initgroups() function for a given user.  The initgroups()
581  * may block a just forked worker process for some time if LDAP or NDIS+
582  * is used, so nxt_user_groups_get() allows to get worker user groups in
583  * main process.  In a nutshell the initgroups() calls getgrouplist()
584  * followed by setgroups().  However Solaris lacks the getgrouplist().
585  * Besides getgrouplist() does not allow to query the exact number of
586  * groups while NGROUPS_MAX can be quite large (e.g. 65536 on Linux).
587  * So nxt_user_groups_get() emulates getgrouplist(): at first the function
588  * saves the super-user groups IDs, then calls initgroups() and saves the
589  * specified user groups IDs, and then restores the super-user groups IDs.
590  * This works at least on Linux, FreeBSD, and Solaris, but does not work
591  * on MacOSX, getgroups(2):
592  *
593  *   To provide compatibility with applications that use getgroups() in
594  *   environments where users may be in more than {NGROUPS_MAX} groups,
595  *   a variant of getgroups(), obtained when compiling with either the
596  *   macros _DARWIN_UNLIMITED_GETGROUPS or _DARWIN_C_SOURCE defined, can
597  *   be used that is not limited to {NGROUPS_MAX} groups.  However, this
598  *   variant only returns the user's default group access list and not
599  *   the group list modified by a call to setgroups(2).
600  *
601  * For such cases initgroups() is used in worker process as fallback.
602  */
603 
604 static nxt_int_t
605 nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc)
606 {
607     int        nsaved, ngroups;
608     nxt_int_t  ret;
609     nxt_gid_t  *saved;
610 
611     nsaved = getgroups(0, NULL);
612 
613     if (nsaved == -1) {
614         nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno);
615         return NXT_ERROR;
616     }
617 
618     nxt_debug(task, "getgroups(0, NULL): %d", nsaved);
619 
620     if (nsaved > NGROUPS_MAX) {
621         /* MacOSX case. */
622 
623         uc->gids = NULL;
624         uc->ngroups = 0;
625 
626         return NXT_OK;
627     }
628 
629     saved = nxt_malloc(nsaved * sizeof(nxt_gid_t));
630 
631     if (saved == NULL) {
632         return NXT_ERROR;
633     }
634 
635     ret = NXT_ERROR;
636 
637     nsaved = getgroups(nsaved, saved);
638 
639     if (nsaved == -1) {
640         nxt_alert(task, "getgroups(%d) failed %E", nsaved, nxt_errno);
641         goto free;
642     }
643 
644     nxt_debug(task, "getgroups(): %d", nsaved);
645 
646     if (initgroups(uc->user, uc->base_gid) != 0) {
647         if (nxt_errno == NXT_EPERM) {
648             nxt_log(task, NXT_LOG_NOTICE,
649                     "initgroups(%s, %d) failed %E, ignored",
650                     uc->user, uc->base_gid, nxt_errno);
651 
652             ret = NXT_OK;
653 
654             goto free;
655 
656         } else {
657             nxt_alert(task, "initgroups(%s, %d) failed %E",
658                       uc->user, uc->base_gid, nxt_errno);
659             goto restore;
660         }
661     }
662 
663     ngroups = getgroups(0, NULL);
664 
665     if (ngroups == -1) {
666         nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno);
667         goto restore;
668     }
669 
670     nxt_debug(task, "getgroups(0, NULL): %d", ngroups);
671 
672     uc->gids = nxt_malloc(ngroups * sizeof(nxt_gid_t));
673 
674     if (uc->gids == NULL) {
675         goto restore;
676     }
677 
678     ngroups = getgroups(ngroups, uc->gids);
679 
680     if (ngroups == -1) {
681         nxt_alert(task, "getgroups(%d) failed %E", ngroups, nxt_errno);
682         goto restore;
683     }
684 
685     uc->ngroups = ngroups;
686 
687 #if (NXT_DEBUG)
688     {
689         u_char      *p, *end;
690         nxt_uint_t  i;
691         u_char      msg[NXT_MAX_ERROR_STR];
692 
693         p = msg;
694         end = msg + NXT_MAX_ERROR_STR;
695 
696         for (i = 0; i < uc->ngroups; i++) {
697             p = nxt_sprintf(p, end, "%uL:", (uint64_t) uc->gids[i]);
698         }
699 
700         nxt_debug(task, "user \"%s\" cred: uid:%uL base gid:%uL, gids:%*s",
701                   uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid,
702                   p - msg, msg);
703     }
704 #endif
705 
706     ret = NXT_OK;
707 
708 restore:
709 
710     if (setgroups(nsaved, saved) != 0) {
711         nxt_alert(task, "setgroups(%d) failed %E", nsaved, nxt_errno);
712         ret = NXT_ERROR;
713     }
714 
715 free:
716 
717     nxt_free(saved);
718 
719     return ret;
720 }
721 
722 
723 nxt_int_t
724 nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
725 {
726     nxt_debug(task, "user cred set: \"%s\" uid:%uL base gid:%uL",
727               uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid);
728 
729     if (setgid(uc->base_gid) != 0) {
730         nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno);
731         return NXT_ERROR;
732     }
733 
734     if (uc->gids != NULL) {
735         if (setgroups(uc->ngroups, uc->gids) != 0) {
736             nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno);
737             return NXT_ERROR;
738         }
739 
740     } else {
741         /* MacOSX fallback. */
742         if (initgroups(uc->user, uc->base_gid) != 0) {
743             nxt_alert(task, "initgroups(%s, %d) failed %E",
744                       uc->user, uc->base_gid, nxt_errno);
745             return NXT_ERROR;
746         }
747     }
748 
749     if (setuid(uc->uid) != 0) {
750         nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno);
751         return NXT_ERROR;
752     }
753 
754     return NXT_OK;
755 }
756 
757 
758 void
759 nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
760 {
761     nxt_assert(port->process == NULL);
762 
763     port->process = process;
764     nxt_queue_insert_tail(&process->ports, &port->link);
765 
766     nxt_process_use(task, process, 1);
767 }
768 
769 
770 nxt_process_type_t
771 nxt_process_type(nxt_process_t *process)
772 {
773     return nxt_queue_is_empty(&process->ports) ? 0 :
774         (nxt_process_port_first(process))->type;
775 }
776 
777 
778 void
779 nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
780 {
781     nxt_port_t  *port;
782 
783     nxt_process_port_each(process, port) {
784 
785         nxt_port_close(task, port);
786 
787         nxt_runtime_port_remove(task, port);
788 
789     } nxt_process_port_loop;
790 }
791 
792 
793 void
794 nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
795 {
796     nxt_thread_mutex_lock(&process->cp_mutex);
797 
798     nxt_port_hash_add(&process->connected_ports, port);
799 
800     nxt_thread_mutex_unlock(&process->cp_mutex);
801 }
802 
803 
804 void
805 nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
806 {
807     nxt_thread_mutex_lock(&process->cp_mutex);
808 
809     nxt_port_hash_remove(&process->connected_ports, port);
810 
811     nxt_thread_mutex_unlock(&process->cp_mutex);
812 }
813 
814 
815 nxt_port_t *
816 nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid,
817     nxt_port_id_t port_id)
818 {
819     nxt_port_t  *res;
820 
821     nxt_thread_mutex_lock(&process->cp_mutex);
822 
823     res = nxt_port_hash_find(&process->connected_ports, pid, port_id);
824 
825     nxt_thread_mutex_unlock(&process->cp_mutex);
826 
827     return res;
828 }
829