xref: /unit/src/nxt_process.c (revision 1182:325b315e48c4)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_main_process.h>
9 
10 #if (NXT_HAVE_CLONE)
11 #include <nxt_clone.h>
12 #endif
13 
14 #include <signal.h>
15 
16 static void nxt_process_start(nxt_task_t *task, nxt_process_t *process);
17 static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc);
18 static nxt_int_t nxt_process_worker_setup(nxt_task_t *task,
19     nxt_process_t *process, int parentfd);
20 
21 /* A cached process pid. */
22 nxt_pid_t  nxt_pid;
23 
24 /* An original parent process pid. */
25 nxt_pid_t  nxt_ppid;
26 
27 nxt_bool_t  nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
28     { 1, 1, 1, 1, 1 },
29     { 1, 0, 0, 0, 0 },
30     { 1, 0, 0, 1, 0 },
31     { 1, 0, 1, 0, 1 },
32     { 1, 0, 0, 0, 0 },
33 };
34 
35 nxt_bool_t  nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
36     { 0, 0, 0, 0, 0 },
37     { 0, 0, 0, 0, 0 },
38     { 0, 0, 0, 1, 0 },
39     { 0, 0, 1, 0, 1 },
40     { 0, 0, 0, 1, 0 },
41 };
42 
43 
44 static nxt_int_t
45 nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd) {
46     pid_t               rpid, pid;
47     ssize_t             n;
48     nxt_int_t           parent_status;
49     nxt_process_t       *p;
50     nxt_runtime_t       *rt;
51     nxt_process_init_t  *init;
52     nxt_process_type_t  ptype;
53 
54     pid  = getpid();
55     rpid = 0;
56     rt   = task->thread->runtime;
57     init = process->init;
58 
59     /* Setup the worker process. */
60 
61     n = read(parentfd, &rpid, sizeof(rpid));
62     if (nxt_slow_path(n == -1 || n != sizeof(rpid))) {
63         nxt_alert(task, "failed to read real pid");
64         return NXT_ERROR;
65     }
66 
67     if (nxt_slow_path(rpid == 0)) {
68         nxt_alert(task, "failed to get real pid from parent");
69         return NXT_ERROR;
70     }
71 
72     nxt_pid = rpid;
73 
74     /* Clean inherited cached thread tid. */
75     task->thread->tid = 0;
76 
77     process->pid = nxt_pid;
78 
79     if (nxt_pid != pid) {
80         nxt_debug(task, "app \"%s\" real pid %d", init->name, nxt_pid);
81         nxt_debug(task, "app \"%s\" isolated pid: %d", init->name, pid);
82     }
83 
84     n = read(parentfd, &parent_status, sizeof(parent_status));
85     if (nxt_slow_path(n == -1 || n != sizeof(parent_status))) {
86         nxt_alert(task, "failed to read parent status");
87         return NXT_ERROR;
88     }
89 
90     if (nxt_slow_path(close(parentfd) == -1)) {
91         nxt_alert(task, "failed to close reader pipe fd");
92         return NXT_ERROR;
93     }
94 
95     if (nxt_slow_path(parent_status != NXT_OK)) {
96         return parent_status;
97     }
98 
99     ptype = init->type;
100 
101     nxt_port_reset_next_id();
102 
103     nxt_event_engine_thread_adopt(task->thread->engine);
104 
105     /* Remove not ready processes. */
106     nxt_runtime_process_each(rt, p) {
107 
108         if (nxt_proc_conn_matrix[ptype][nxt_process_type(p)] == 0) {
109             nxt_debug(task, "remove not required process %PI", p->pid);
110 
111             nxt_process_close_ports(task, p);
112 
113             continue;
114         }
115 
116         if (!p->ready) {
117             nxt_debug(task, "remove not ready process %PI", p->pid);
118 
119             nxt_process_close_ports(task, p);
120 
121             continue;
122         }
123 
124         nxt_port_mmaps_destroy(&p->incoming, 0);
125         nxt_port_mmaps_destroy(&p->outgoing, 0);
126 
127     } nxt_runtime_process_loop;
128 
129     nxt_runtime_process_add(task, process);
130 
131     nxt_process_start(task, process);
132 
133     process->ready = 1;
134 
135     return NXT_OK;
136 }
137 
138 
139 nxt_pid_t
140 nxt_process_create(nxt_task_t *task, nxt_process_t *process)
141 {
142     int                 pipefd[2];
143     nxt_int_t           ret;
144     nxt_pid_t           pid;
145     nxt_process_init_t  *init;
146 
147     if (nxt_slow_path(pipe(pipefd) == -1)) {
148         nxt_alert(task, "failed to create process pipe for passing rpid");
149         return -1;
150     }
151 
152     init = process->init;
153 
154 #if (NXT_HAVE_CLONE)
155     pid = nxt_clone(SIGCHLD|init->isolation.clone.flags);
156 #else
157     pid = fork();
158 #endif
159 
160     if (nxt_slow_path(pid < 0)) {
161 #if (NXT_HAVE_CLONE)
162         nxt_alert(task, "clone() failed while creating \"%s\" %E",
163                   init->name, nxt_errno);
164 #else
165         nxt_alert(task, "fork() failed while creating \"%s\" %E",
166                   init->name, nxt_errno);
167 #endif
168 
169         return pid;
170     }
171 
172     if (pid == 0) {
173         /* Child. */
174 
175         if (nxt_slow_path(close(pipefd[1]) == -1)) {
176             nxt_alert(task, "failed to close writer pipe fd");
177             return NXT_ERROR;
178         }
179 
180         ret = nxt_process_worker_setup(task, process, pipefd[0]);
181         if (nxt_slow_path(ret != NXT_OK)) {
182             exit(1);
183         }
184 
185         /*
186          * Explicitly return 0 to notice the caller function this is the child.
187          * The caller must return to the event engine work queue loop.
188          */
189         return 0;
190     }
191 
192     /* Parent. */
193 
194     if (nxt_slow_path(close(pipefd[0]) != 0)) {
195         nxt_alert(task, "failed to close pipe: %E", nxt_errno);
196     }
197 
198     /*
199      * At this point, the child process is blocked reading the
200      * pipe fd to get its real pid (rpid).
201      *
202      * If anything goes wrong now, we need to terminate the child
203      * process by sending a NXT_ERROR in the pipe.
204      */
205 
206 #if (NXT_HAVE_CLONE)
207     nxt_debug(task, "clone(\"%s\"): %PI", init->name, pid);
208 #else
209     nxt_debug(task, "fork(\"%s\"): %PI", init->name, pid);
210 #endif
211 
212     if (nxt_slow_path(write(pipefd[1], &pid, sizeof(pid)) == -1)) {
213         nxt_alert(task, "failed to write real pid");
214         goto fail_cleanup;
215     }
216 
217 #if (NXT_HAVE_CLONE_NEWUSER)
218     if ((init->isolation.clone.flags & CLONE_NEWUSER) == CLONE_NEWUSER) {
219         ret = nxt_clone_proc_map(task, pid, &init->isolation.clone);
220         if (nxt_slow_path(ret != NXT_OK)) {
221             goto fail_cleanup;
222         }
223     }
224 #endif
225 
226     ret = NXT_OK;
227 
228     if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) {
229         nxt_alert(task, "failed to write status");
230         goto fail_cleanup;
231     }
232 
233     process->pid = pid;
234 
235     nxt_runtime_process_add(task, process);
236 
237     return pid;
238 
239 fail_cleanup:
240 
241     ret = NXT_ERROR;
242 
243     if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) {
244         nxt_alert(task, "failed to write status");
245     }
246 
247     if (nxt_slow_path(close(pipefd[1]) != 0)) {
248         nxt_alert(task, "failed to close pipe: %E", nxt_errno);
249     }
250 
251     waitpid(pid, NULL, 0);
252 
253     return -1;
254 }
255 
256 
257 static void
258 nxt_process_start(nxt_task_t *task, nxt_process_t *process)
259 {
260     nxt_int_t                    ret;
261     nxt_port_t                   *port, *main_port;
262     nxt_thread_t                 *thread;
263     nxt_runtime_t                *rt;
264     nxt_process_init_t           *init;
265     nxt_event_engine_t           *engine;
266     const nxt_event_interface_t  *interface;
267 
268     init = process->init;
269 
270     nxt_log(task, NXT_LOG_INFO, "%s started", init->name);
271 
272     nxt_process_title(task, "unit: %s", init->name);
273 
274     thread = task->thread;
275     rt     = thread->runtime;
276 
277     nxt_random_init(&thread->random);
278 
279     if (rt->capabilities.setid && init->user_cred != NULL) {
280         ret = nxt_user_cred_set(task, init->user_cred);
281         if (ret != NXT_OK) {
282             goto fail;
283         }
284     }
285 
286     rt->type = init->type;
287 
288     engine = thread->engine;
289 
290     /* Update inherited main process event engine and signals processing. */
291     engine->signals->sigev = init->signals;
292 
293     interface = nxt_service_get(rt->services, "engine", rt->engine);
294     if (nxt_slow_path(interface == NULL)) {
295         goto fail;
296     }
297 
298     if (nxt_event_engine_change(engine, interface, rt->batch) != NXT_OK) {
299         goto fail;
300     }
301 
302     ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads,
303                                          60000 * 1000000LL);
304     if (nxt_slow_path(ret != NXT_OK)) {
305         goto fail;
306     }
307 
308     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
309 
310     nxt_port_read_close(main_port);
311     nxt_port_write_enable(task, main_port);
312 
313     port = nxt_process_port_first(process);
314 
315     nxt_port_write_close(port);
316 
317     ret = init->start(task, init->data);
318 
319     if (nxt_slow_path(ret != NXT_OK)) {
320         goto fail;
321     }
322 
323     nxt_port_enable(task, port, init->port_handlers);
324 
325     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY,
326                                 -1, init->stream, 0, NULL);
327 
328     if (nxt_slow_path(ret != NXT_OK)) {
329         nxt_log(task, NXT_LOG_ERR, "failed to send READY message to main");
330 
331         goto fail;
332     }
333 
334     return;
335 
336 fail:
337 
338     exit(1);
339 }
340 
341 
342 #if (NXT_HAVE_POSIX_SPAWN)
343 
344 /*
345  * Linux glibc 2.2 posix_spawn() is implemented via fork()/execve().
346  * Linux glibc 2.4 posix_spawn() without file actions and spawn
347  * attributes uses vfork()/execve().
348  *
349  * On FreeBSD 8.0 posix_spawn() is implemented via vfork()/execve().
350  *
351  * Solaris 10:
352  *   In the Solaris 10 OS, posix_spawn() is currently implemented using
353  *   private-to-libc vfork(), execve(), and exit() functions.  They are
354  *   identical to regular vfork(), execve(), and exit() in functionality,
355  *   but they are not exported from libc and therefore don't cause the
356  *   deadlock-in-the-dynamic-linker problem that any multithreaded code
357  *   outside of libc that calls vfork() can cause.
358  *
359  * On MacOSX 10.5 (Leoprad) and NetBSD 6.0 posix_spawn() is implemented
360  * as syscall.
361  */
362 
363 nxt_pid_t
364 nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp)
365 {
366     nxt_pid_t  pid;
367 
368     nxt_debug(task, "posix_spawn(\"%s\")", name);
369 
370     if (posix_spawn(&pid, name, NULL, NULL, argv, envp) != 0) {
371         nxt_alert(task, "posix_spawn(\"%s\") failed %E", name, nxt_errno);
372         return -1;
373     }
374 
375     return pid;
376 }
377 
378 #else
379 
380 nxt_pid_t
381 nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp)
382 {
383     nxt_pid_t  pid;
384 
385     /*
386      * vfork() is better than fork() because:
387      *   it is faster several times;
388      *   its execution time does not depend on private memory mapping size;
389      *   it has lesser chances to fail due to the ENOMEM error.
390      */
391 
392     pid = vfork();
393 
394     switch (pid) {
395 
396     case -1:
397         nxt_alert(task, "vfork() failed while executing \"%s\" %E",
398                   name, nxt_errno);
399         break;
400 
401     case 0:
402         /* A child. */
403         nxt_debug(task, "execve(\"%s\")", name);
404 
405         (void) execve(name, argv, envp);
406 
407         nxt_alert(task, "execve(\"%s\") failed %E", name, nxt_errno);
408 
409         exit(1);
410         nxt_unreachable();
411         break;
412 
413     default:
414         /* A parent. */
415         nxt_debug(task, "vfork(): %PI", pid);
416         break;
417     }
418 
419     return pid;
420 }
421 
422 #endif
423 
424 
425 nxt_int_t
426 nxt_process_daemon(nxt_task_t *task)
427 {
428     nxt_fd_t      fd;
429     nxt_pid_t     pid;
430     const char    *msg;
431 
432     fd = -1;
433 
434     /*
435      * fork() followed by a parent process's exit() detaches a child process
436      * from an init script or terminal shell process which has started the
437      * parent process and allows the child process to run in background.
438      */
439 
440     pid = fork();
441 
442     switch (pid) {
443 
444     case -1:
445         msg = "fork() failed %E";
446         goto fail;
447 
448     case 0:
449         /* A child. */
450         break;
451 
452     default:
453         /* A parent. */
454         nxt_debug(task, "fork(): %PI", pid);
455         exit(0);
456         nxt_unreachable();
457     }
458 
459     nxt_pid = getpid();
460 
461     /* Clean inherited cached thread tid. */
462     task->thread->tid = 0;
463 
464     nxt_debug(task, "daemon");
465 
466     /* Detach from controlling terminal. */
467 
468     if (setsid() == -1) {
469         nxt_alert(task, "setsid() failed %E", nxt_errno);
470         return NXT_ERROR;
471     }
472 
473     /*
474      * Reset file mode creation mask: any access
475      * rights can be set on file creation.
476      */
477     umask(0);
478 
479     /* Redirect STDIN and STDOUT to the "/dev/null". */
480 
481     fd = open("/dev/null", O_RDWR);
482     if (fd == -1) {
483         msg = "open(\"/dev/null\") failed %E";
484         goto fail;
485     }
486 
487     if (dup2(fd, STDIN_FILENO) == -1) {
488         msg = "dup2(\"/dev/null\", STDIN) failed %E";
489         goto fail;
490     }
491 
492     if (dup2(fd, STDOUT_FILENO) == -1) {
493         msg = "dup2(\"/dev/null\", STDOUT) failed %E";
494         goto fail;
495     }
496 
497     if (fd > STDERR_FILENO) {
498         nxt_fd_close(fd);
499     }
500 
501     return NXT_OK;
502 
503 fail:
504 
505     nxt_alert(task, msg, nxt_errno);
506 
507     if (fd != -1) {
508         nxt_fd_close(fd);
509     }
510 
511     return NXT_ERROR;
512 }
513 
514 
515 void
516 nxt_nanosleep(nxt_nsec_t ns)
517 {
518     struct timespec  ts;
519 
520     ts.tv_sec = ns / 1000000000;
521     ts.tv_nsec = ns % 1000000000;
522 
523     (void) nanosleep(&ts, NULL);
524 }
525 
526 
527 nxt_int_t
528 nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, const char *group)
529 {
530     struct group   *grp;
531     struct passwd  *pwd;
532 
533     nxt_errno = 0;
534 
535     pwd = getpwnam(uc->user);
536 
537     if (nxt_slow_path(pwd == NULL)) {
538 
539         if (nxt_errno == 0) {
540             nxt_alert(task, "getpwnam(\"%s\") failed, user \"%s\" not found",
541                       uc->user, uc->user);
542         } else {
543             nxt_alert(task, "getpwnam(\"%s\") failed %E", uc->user, nxt_errno);
544         }
545 
546         return NXT_ERROR;
547     }
548 
549     uc->uid = pwd->pw_uid;
550     uc->base_gid = pwd->pw_gid;
551 
552     if (group != NULL && group[0] != '\0') {
553         nxt_errno = 0;
554 
555         grp = getgrnam(group);
556 
557         if (nxt_slow_path(grp == NULL)) {
558 
559             if (nxt_errno == 0) {
560                 nxt_alert(task,
561                           "getgrnam(\"%s\") failed, group \"%s\" not found",
562                           group, group);
563             } else {
564                 nxt_alert(task, "getgrnam(\"%s\") failed %E", group, nxt_errno);
565             }
566 
567             return NXT_ERROR;
568         }
569 
570         uc->base_gid = grp->gr_gid;
571     }
572 
573     return nxt_user_groups_get(task, uc);
574 }
575 
576 
577 /*
578  * nxt_user_groups_get() stores an array of groups IDs which should be
579  * set by the initgroups() function for a given user.  The initgroups()
580  * may block a just forked worker process for some time if LDAP or NDIS+
581  * is used, so nxt_user_groups_get() allows to get worker user groups in
582  * main process.  In a nutshell the initgroups() calls getgrouplist()
583  * followed by setgroups().  However Solaris lacks the getgrouplist().
584  * Besides getgrouplist() does not allow to query the exact number of
585  * groups while NGROUPS_MAX can be quite large (e.g. 65536 on Linux).
586  * So nxt_user_groups_get() emulates getgrouplist(): at first the function
587  * saves the super-user groups IDs, then calls initgroups() and saves the
588  * specified user groups IDs, and then restores the super-user groups IDs.
589  * This works at least on Linux, FreeBSD, and Solaris, but does not work
590  * on MacOSX, getgroups(2):
591  *
592  *   To provide compatibility with applications that use getgroups() in
593  *   environments where users may be in more than {NGROUPS_MAX} groups,
594  *   a variant of getgroups(), obtained when compiling with either the
595  *   macros _DARWIN_UNLIMITED_GETGROUPS or _DARWIN_C_SOURCE defined, can
596  *   be used that is not limited to {NGROUPS_MAX} groups.  However, this
597  *   variant only returns the user's default group access list and not
598  *   the group list modified by a call to setgroups(2).
599  *
600  * For such cases initgroups() is used in worker process as fallback.
601  */
602 
603 static nxt_int_t
604 nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc)
605 {
606     int        nsaved, ngroups;
607     nxt_int_t  ret;
608     nxt_gid_t  *saved;
609 
610     nsaved = getgroups(0, NULL);
611 
612     if (nsaved == -1) {
613         nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno);
614         return NXT_ERROR;
615     }
616 
617     nxt_debug(task, "getgroups(0, NULL): %d", nsaved);
618 
619     if (nsaved > NGROUPS_MAX) {
620         /* MacOSX case. */
621 
622         uc->gids = NULL;
623         uc->ngroups = 0;
624 
625         return NXT_OK;
626     }
627 
628     saved = nxt_malloc(nsaved * sizeof(nxt_gid_t));
629 
630     if (saved == NULL) {
631         return NXT_ERROR;
632     }
633 
634     ret = NXT_ERROR;
635 
636     nsaved = getgroups(nsaved, saved);
637 
638     if (nsaved == -1) {
639         nxt_alert(task, "getgroups(%d) failed %E", nsaved, nxt_errno);
640         goto free;
641     }
642 
643     nxt_debug(task, "getgroups(): %d", nsaved);
644 
645     if (initgroups(uc->user, uc->base_gid) != 0) {
646         if (nxt_errno == NXT_EPERM) {
647             nxt_log(task, NXT_LOG_NOTICE,
648                     "initgroups(%s, %d) failed %E, ignored",
649                     uc->user, uc->base_gid, nxt_errno);
650 
651             ret = NXT_OK;
652 
653             goto free;
654 
655         } else {
656             nxt_alert(task, "initgroups(%s, %d) failed %E",
657                       uc->user, uc->base_gid, nxt_errno);
658             goto restore;
659         }
660     }
661 
662     ngroups = getgroups(0, NULL);
663 
664     if (ngroups == -1) {
665         nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno);
666         goto restore;
667     }
668 
669     nxt_debug(task, "getgroups(0, NULL): %d", ngroups);
670 
671     uc->gids = nxt_malloc(ngroups * sizeof(nxt_gid_t));
672 
673     if (uc->gids == NULL) {
674         goto restore;
675     }
676 
677     ngroups = getgroups(ngroups, uc->gids);
678 
679     if (ngroups == -1) {
680         nxt_alert(task, "getgroups(%d) failed %E", ngroups, nxt_errno);
681         goto restore;
682     }
683 
684     uc->ngroups = ngroups;
685 
686 #if (NXT_DEBUG)
687     {
688         u_char      *p, *end;
689         nxt_uint_t  i;
690         u_char      msg[NXT_MAX_ERROR_STR];
691 
692         p = msg;
693         end = msg + NXT_MAX_ERROR_STR;
694 
695         for (i = 0; i < uc->ngroups; i++) {
696             p = nxt_sprintf(p, end, "%uL:", (uint64_t) uc->gids[i]);
697         }
698 
699         nxt_debug(task, "user \"%s\" cred: uid:%uL base gid:%uL, gids:%*s",
700                   uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid,
701                   p - msg, msg);
702     }
703 #endif
704 
705     ret = NXT_OK;
706 
707 restore:
708 
709     if (setgroups(nsaved, saved) != 0) {
710         nxt_alert(task, "setgroups(%d) failed %E", nsaved, nxt_errno);
711         ret = NXT_ERROR;
712     }
713 
714 free:
715 
716     nxt_free(saved);
717 
718     return ret;
719 }
720 
721 
722 nxt_int_t
723 nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
724 {
725     nxt_debug(task, "user cred set: \"%s\" uid:%uL base gid:%uL",
726               uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid);
727 
728     if (setgid(uc->base_gid) != 0) {
729         nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno);
730         return NXT_ERROR;
731     }
732 
733     if (uc->gids != NULL) {
734         if (setgroups(uc->ngroups, uc->gids) != 0) {
735             nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno);
736             return NXT_ERROR;
737         }
738 
739     } else {
740         /* MacOSX fallback. */
741         if (initgroups(uc->user, uc->base_gid) != 0) {
742             nxt_alert(task, "initgroups(%s, %d) failed %E",
743                       uc->user, uc->base_gid, nxt_errno);
744             return NXT_ERROR;
745         }
746     }
747 
748     if (setuid(uc->uid) != 0) {
749         nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno);
750         return NXT_ERROR;
751     }
752 
753     return NXT_OK;
754 }
755 
756 
757 void
758 nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
759 {
760     nxt_assert(port->process == NULL);
761 
762     port->process = process;
763     nxt_queue_insert_tail(&process->ports, &port->link);
764 
765     nxt_process_use(task, process, 1);
766 }
767 
768 
769 nxt_process_type_t
770 nxt_process_type(nxt_process_t *process)
771 {
772     return nxt_queue_is_empty(&process->ports) ? 0 :
773         (nxt_process_port_first(process))->type;
774 }
775 
776 
777 void
778 nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
779 {
780     nxt_port_t  *port;
781 
782     nxt_process_port_each(process, port) {
783 
784         nxt_port_close(task, port);
785 
786         nxt_runtime_port_remove(task, port);
787 
788     } nxt_process_port_loop;
789 }
790 
791 
792 void
793 nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
794 {
795     nxt_thread_mutex_lock(&process->cp_mutex);
796 
797     nxt_port_hash_add(&process->connected_ports, port);
798 
799     nxt_thread_mutex_unlock(&process->cp_mutex);
800 }
801 
802 
803 void
804 nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
805 {
806     nxt_thread_mutex_lock(&process->cp_mutex);
807 
808     nxt_port_hash_remove(&process->connected_ports, port);
809 
810     nxt_thread_mutex_unlock(&process->cp_mutex);
811 }
812 
813 
814 nxt_port_t *
815 nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid,
816     nxt_port_id_t port_id)
817 {
818     nxt_port_t  *res;
819 
820     nxt_thread_mutex_lock(&process->cp_mutex);
821 
822     res = nxt_port_hash_find(&process->connected_ports, pid, port_id);
823 
824     nxt_thread_mutex_unlock(&process->cp_mutex);
825 
826     return res;
827 }
828