xref: /unit/src/nxt_process.c (revision 1209:a3dfe6c407b6)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_main_process.h>
9 
10 #if (NXT_HAVE_CLONE)
11 #include <nxt_clone.h>
12 #endif
13 
14 #include <signal.h>
15 
16 static void nxt_process_start(nxt_task_t *task, nxt_process_t *process);
17 static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc);
18 static nxt_int_t nxt_process_worker_setup(nxt_task_t *task,
19     nxt_process_t *process, int parentfd);
20 
21 /* A cached process pid. */
22 nxt_pid_t  nxt_pid;
23 
24 /* An original parent process pid. */
25 nxt_pid_t  nxt_ppid;
26 
27 nxt_bool_t  nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
28     { 1, 1, 1, 1, 1 },
29     { 1, 0, 0, 0, 0 },
30     { 1, 0, 0, 1, 0 },
31     { 1, 0, 1, 0, 1 },
32     { 1, 0, 0, 0, 0 },
33 };
34 
35 nxt_bool_t  nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
36     { 0, 0, 0, 0, 0 },
37     { 0, 0, 0, 0, 0 },
38     { 0, 0, 0, 1, 0 },
39     { 0, 0, 1, 0, 1 },
40     { 0, 0, 0, 1, 0 },
41 };
42 
43 
44 static nxt_int_t
45 nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd) {
46     pid_t               rpid, pid;
47     ssize_t             n;
48     nxt_int_t           parent_status;
49     nxt_process_t       *p;
50     nxt_runtime_t       *rt;
51     nxt_process_init_t  *init;
52     nxt_process_type_t  ptype;
53 
54     pid  = getpid();
55     rpid = 0;
56     rt   = task->thread->runtime;
57     init = process->init;
58 
59     /* Setup the worker process. */
60 
61     n = read(parentfd, &rpid, sizeof(rpid));
62     if (nxt_slow_path(n == -1 || n != sizeof(rpid))) {
63         nxt_alert(task, "failed to read real pid");
64         return NXT_ERROR;
65     }
66 
67     if (nxt_slow_path(rpid == 0)) {
68         nxt_alert(task, "failed to get real pid from parent");
69         return NXT_ERROR;
70     }
71 
72     nxt_pid = rpid;
73 
74     /* Clean inherited cached thread tid. */
75     task->thread->tid = 0;
76 
77     process->pid = nxt_pid;
78 
79     if (nxt_pid != pid) {
80         nxt_debug(task, "app \"%s\" real pid %d", init->name, nxt_pid);
81         nxt_debug(task, "app \"%s\" isolated pid: %d", init->name, pid);
82     }
83 
84     n = read(parentfd, &parent_status, sizeof(parent_status));
85     if (nxt_slow_path(n == -1 || n != sizeof(parent_status))) {
86         nxt_alert(task, "failed to read parent status");
87         return NXT_ERROR;
88     }
89 
90     if (nxt_slow_path(close(parentfd) == -1)) {
91         nxt_alert(task, "failed to close reader pipe fd");
92         return NXT_ERROR;
93     }
94 
95     if (nxt_slow_path(parent_status != NXT_OK)) {
96         return parent_status;
97     }
98 
99     ptype = init->type;
100 
101     nxt_port_reset_next_id();
102 
103     nxt_event_engine_thread_adopt(task->thread->engine);
104 
105     /* Remove not ready processes. */
106     nxt_runtime_process_each(rt, p) {
107 
108         if (nxt_proc_conn_matrix[ptype][nxt_process_type(p)] == 0) {
109             nxt_debug(task, "remove not required process %PI", p->pid);
110 
111             nxt_process_close_ports(task, p);
112 
113             continue;
114         }
115 
116         if (!p->ready) {
117             nxt_debug(task, "remove not ready process %PI", p->pid);
118 
119             nxt_process_close_ports(task, p);
120 
121             continue;
122         }
123 
124         nxt_port_mmaps_destroy(&p->incoming, 0);
125         nxt_port_mmaps_destroy(&p->outgoing, 0);
126 
127     } nxt_runtime_process_loop;
128 
129     nxt_runtime_process_add(task, process);
130 
131     nxt_process_start(task, process);
132 
133     process->ready = 1;
134 
135     return NXT_OK;
136 }
137 
138 
139 nxt_pid_t
140 nxt_process_create(nxt_task_t *task, nxt_process_t *process)
141 {
142     int                 pipefd[2];
143     nxt_int_t           ret;
144     nxt_pid_t           pid;
145     nxt_process_init_t  *init;
146 
147     if (nxt_slow_path(pipe(pipefd) == -1)) {
148         nxt_alert(task, "failed to create process pipe for passing rpid");
149         return -1;
150     }
151 
152     init = process->init;
153 
154 #if (NXT_HAVE_CLONE)
155     pid = nxt_clone(SIGCHLD|init->isolation.clone.flags);
156 #else
157     pid = fork();
158 #endif
159 
160     if (nxt_slow_path(pid < 0)) {
161 #if (NXT_HAVE_CLONE)
162         nxt_alert(task, "clone() failed while creating \"%s\" %E",
163                   init->name, nxt_errno);
164 #else
165         nxt_alert(task, "fork() failed while creating \"%s\" %E",
166                   init->name, nxt_errno);
167 #endif
168 
169         if (nxt_slow_path(close(pipefd[0]) != 0)) {
170             nxt_alert(task, "failed to close pipe: %E", nxt_errno);
171         }
172 
173         if (nxt_slow_path(close(pipefd[1]) != 0)) {
174             nxt_alert(task, "failed to close pipe: %E", nxt_errno);
175         }
176 
177         return pid;
178     }
179 
180     if (pid == 0) {
181         /* Child. */
182 
183         if (nxt_slow_path(close(pipefd[1]) == -1)) {
184             nxt_alert(task, "failed to close writer pipe fd");
185             return NXT_ERROR;
186         }
187 
188         ret = nxt_process_worker_setup(task, process, pipefd[0]);
189         if (nxt_slow_path(ret != NXT_OK)) {
190             exit(1);
191         }
192 
193         /*
194          * Explicitly return 0 to notice the caller function this is the child.
195          * The caller must return to the event engine work queue loop.
196          */
197         return 0;
198     }
199 
200     /* Parent. */
201 
202     if (nxt_slow_path(close(pipefd[0]) != 0)) {
203         nxt_alert(task, "failed to close pipe: %E", nxt_errno);
204     }
205 
206     /*
207      * At this point, the child process is blocked reading the
208      * pipe fd to get its real pid (rpid).
209      *
210      * If anything goes wrong now, we need to terminate the child
211      * process by sending a NXT_ERROR in the pipe.
212      */
213 
214 #if (NXT_HAVE_CLONE)
215     nxt_debug(task, "clone(\"%s\"): %PI", init->name, pid);
216 #else
217     nxt_debug(task, "fork(\"%s\"): %PI", init->name, pid);
218 #endif
219 
220     if (nxt_slow_path(write(pipefd[1], &pid, sizeof(pid)) == -1)) {
221         nxt_alert(task, "failed to write real pid");
222         goto fail_cleanup;
223     }
224 
225 #if (NXT_HAVE_CLONE_NEWUSER)
226     if ((init->isolation.clone.flags & CLONE_NEWUSER) == CLONE_NEWUSER) {
227         ret = nxt_clone_proc_map(task, pid, &init->isolation.clone);
228         if (nxt_slow_path(ret != NXT_OK)) {
229             goto fail_cleanup;
230         }
231     }
232 #endif
233 
234     ret = NXT_OK;
235 
236     if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) {
237         nxt_alert(task, "failed to write status");
238         goto fail_cleanup;
239     }
240 
241     if (nxt_slow_path(close(pipefd[1]) != 0)) {
242         nxt_alert(task, "failed to close pipe: %E", nxt_errno);
243     }
244 
245     process->pid = pid;
246 
247     nxt_runtime_process_add(task, process);
248 
249     return pid;
250 
251 fail_cleanup:
252 
253     ret = NXT_ERROR;
254 
255     if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) {
256         nxt_alert(task, "failed to write status");
257     }
258 
259     if (nxt_slow_path(close(pipefd[1]) != 0)) {
260         nxt_alert(task, "failed to close pipe: %E", nxt_errno);
261     }
262 
263     waitpid(pid, NULL, 0);
264 
265     return -1;
266 }
267 
268 
269 static void
270 nxt_process_start(nxt_task_t *task, nxt_process_t *process)
271 {
272     nxt_int_t                    ret;
273     nxt_port_t                   *port, *main_port;
274     nxt_thread_t                 *thread;
275     nxt_runtime_t                *rt;
276     nxt_process_init_t           *init;
277     nxt_event_engine_t           *engine;
278     const nxt_event_interface_t  *interface;
279 
280     init = process->init;
281 
282     nxt_log(task, NXT_LOG_INFO, "%s started", init->name);
283 
284     nxt_process_title(task, "unit: %s", init->name);
285 
286     thread = task->thread;
287     rt     = thread->runtime;
288 
289     nxt_random_init(&thread->random);
290 
291     if (rt->capabilities.setid && init->user_cred != NULL) {
292         ret = nxt_user_cred_set(task, init->user_cred);
293         if (ret != NXT_OK) {
294             goto fail;
295         }
296     }
297 
298     rt->type = init->type;
299 
300     engine = thread->engine;
301 
302     /* Update inherited main process event engine and signals processing. */
303     engine->signals->sigev = init->signals;
304 
305     interface = nxt_service_get(rt->services, "engine", rt->engine);
306     if (nxt_slow_path(interface == NULL)) {
307         goto fail;
308     }
309 
310     if (nxt_event_engine_change(engine, interface, rt->batch) != NXT_OK) {
311         goto fail;
312     }
313 
314     ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads,
315                                          60000 * 1000000LL);
316     if (nxt_slow_path(ret != NXT_OK)) {
317         goto fail;
318     }
319 
320     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
321 
322     nxt_port_read_close(main_port);
323     nxt_port_write_enable(task, main_port);
324 
325     port = nxt_process_port_first(process);
326 
327     nxt_port_write_close(port);
328 
329     ret = init->start(task, init->data);
330 
331     if (nxt_slow_path(ret != NXT_OK)) {
332         goto fail;
333     }
334 
335     nxt_port_enable(task, port, init->port_handlers);
336 
337     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY,
338                                 -1, init->stream, 0, NULL);
339 
340     if (nxt_slow_path(ret != NXT_OK)) {
341         nxt_log(task, NXT_LOG_ERR, "failed to send READY message to main");
342 
343         goto fail;
344     }
345 
346     return;
347 
348 fail:
349 
350     exit(1);
351 }
352 
353 
354 #if (NXT_HAVE_POSIX_SPAWN)
355 
356 /*
357  * Linux glibc 2.2 posix_spawn() is implemented via fork()/execve().
358  * Linux glibc 2.4 posix_spawn() without file actions and spawn
359  * attributes uses vfork()/execve().
360  *
361  * On FreeBSD 8.0 posix_spawn() is implemented via vfork()/execve().
362  *
363  * Solaris 10:
364  *   In the Solaris 10 OS, posix_spawn() is currently implemented using
365  *   private-to-libc vfork(), execve(), and exit() functions.  They are
366  *   identical to regular vfork(), execve(), and exit() in functionality,
367  *   but they are not exported from libc and therefore don't cause the
368  *   deadlock-in-the-dynamic-linker problem that any multithreaded code
369  *   outside of libc that calls vfork() can cause.
370  *
371  * On MacOSX 10.5 (Leoprad) and NetBSD 6.0 posix_spawn() is implemented
372  * as syscall.
373  */
374 
375 nxt_pid_t
376 nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp)
377 {
378     nxt_pid_t  pid;
379 
380     nxt_debug(task, "posix_spawn(\"%s\")", name);
381 
382     if (posix_spawn(&pid, name, NULL, NULL, argv, envp) != 0) {
383         nxt_alert(task, "posix_spawn(\"%s\") failed %E", name, nxt_errno);
384         return -1;
385     }
386 
387     return pid;
388 }
389 
390 #else
391 
392 nxt_pid_t
393 nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp)
394 {
395     nxt_pid_t  pid;
396 
397     /*
398      * vfork() is better than fork() because:
399      *   it is faster several times;
400      *   its execution time does not depend on private memory mapping size;
401      *   it has lesser chances to fail due to the ENOMEM error.
402      */
403 
404     pid = vfork();
405 
406     switch (pid) {
407 
408     case -1:
409         nxt_alert(task, "vfork() failed while executing \"%s\" %E",
410                   name, nxt_errno);
411         break;
412 
413     case 0:
414         /* A child. */
415         nxt_debug(task, "execve(\"%s\")", name);
416 
417         (void) execve(name, argv, envp);
418 
419         nxt_alert(task, "execve(\"%s\") failed %E", name, nxt_errno);
420 
421         exit(1);
422         nxt_unreachable();
423         break;
424 
425     default:
426         /* A parent. */
427         nxt_debug(task, "vfork(): %PI", pid);
428         break;
429     }
430 
431     return pid;
432 }
433 
434 #endif
435 
436 
437 nxt_int_t
438 nxt_process_daemon(nxt_task_t *task)
439 {
440     nxt_fd_t      fd;
441     nxt_pid_t     pid;
442     const char    *msg;
443 
444     fd = -1;
445 
446     /*
447      * fork() followed by a parent process's exit() detaches a child process
448      * from an init script or terminal shell process which has started the
449      * parent process and allows the child process to run in background.
450      */
451 
452     pid = fork();
453 
454     switch (pid) {
455 
456     case -1:
457         msg = "fork() failed %E";
458         goto fail;
459 
460     case 0:
461         /* A child. */
462         break;
463 
464     default:
465         /* A parent. */
466         nxt_debug(task, "fork(): %PI", pid);
467         exit(0);
468         nxt_unreachable();
469     }
470 
471     nxt_pid = getpid();
472 
473     /* Clean inherited cached thread tid. */
474     task->thread->tid = 0;
475 
476     nxt_debug(task, "daemon");
477 
478     /* Detach from controlling terminal. */
479 
480     if (setsid() == -1) {
481         nxt_alert(task, "setsid() failed %E", nxt_errno);
482         return NXT_ERROR;
483     }
484 
485     /*
486      * Reset file mode creation mask: any access
487      * rights can be set on file creation.
488      */
489     umask(0);
490 
491     /* Redirect STDIN and STDOUT to the "/dev/null". */
492 
493     fd = open("/dev/null", O_RDWR);
494     if (fd == -1) {
495         msg = "open(\"/dev/null\") failed %E";
496         goto fail;
497     }
498 
499     if (dup2(fd, STDIN_FILENO) == -1) {
500         msg = "dup2(\"/dev/null\", STDIN) failed %E";
501         goto fail;
502     }
503 
504     if (dup2(fd, STDOUT_FILENO) == -1) {
505         msg = "dup2(\"/dev/null\", STDOUT) failed %E";
506         goto fail;
507     }
508 
509     if (fd > STDERR_FILENO) {
510         nxt_fd_close(fd);
511     }
512 
513     return NXT_OK;
514 
515 fail:
516 
517     nxt_alert(task, msg, nxt_errno);
518 
519     if (fd != -1) {
520         nxt_fd_close(fd);
521     }
522 
523     return NXT_ERROR;
524 }
525 
526 
527 void
528 nxt_nanosleep(nxt_nsec_t ns)
529 {
530     struct timespec  ts;
531 
532     ts.tv_sec = ns / 1000000000;
533     ts.tv_nsec = ns % 1000000000;
534 
535     (void) nanosleep(&ts, NULL);
536 }
537 
538 
539 nxt_int_t
540 nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, const char *group)
541 {
542     struct group   *grp;
543     struct passwd  *pwd;
544 
545     nxt_errno = 0;
546 
547     pwd = getpwnam(uc->user);
548 
549     if (nxt_slow_path(pwd == NULL)) {
550 
551         if (nxt_errno == 0) {
552             nxt_alert(task, "getpwnam(\"%s\") failed, user \"%s\" not found",
553                       uc->user, uc->user);
554         } else {
555             nxt_alert(task, "getpwnam(\"%s\") failed %E", uc->user, nxt_errno);
556         }
557 
558         return NXT_ERROR;
559     }
560 
561     uc->uid = pwd->pw_uid;
562     uc->base_gid = pwd->pw_gid;
563 
564     if (group != NULL && group[0] != '\0') {
565         nxt_errno = 0;
566 
567         grp = getgrnam(group);
568 
569         if (nxt_slow_path(grp == NULL)) {
570 
571             if (nxt_errno == 0) {
572                 nxt_alert(task,
573                           "getgrnam(\"%s\") failed, group \"%s\" not found",
574                           group, group);
575             } else {
576                 nxt_alert(task, "getgrnam(\"%s\") failed %E", group, nxt_errno);
577             }
578 
579             return NXT_ERROR;
580         }
581 
582         uc->base_gid = grp->gr_gid;
583     }
584 
585     return nxt_user_groups_get(task, uc);
586 }
587 
588 
589 /*
590  * nxt_user_groups_get() stores an array of groups IDs which should be
591  * set by the initgroups() function for a given user.  The initgroups()
592  * may block a just forked worker process for some time if LDAP or NDIS+
593  * is used, so nxt_user_groups_get() allows to get worker user groups in
594  * main process.  In a nutshell the initgroups() calls getgrouplist()
595  * followed by setgroups().  However Solaris lacks the getgrouplist().
596  * Besides getgrouplist() does not allow to query the exact number of
597  * groups while NGROUPS_MAX can be quite large (e.g. 65536 on Linux).
598  * So nxt_user_groups_get() emulates getgrouplist(): at first the function
599  * saves the super-user groups IDs, then calls initgroups() and saves the
600  * specified user groups IDs, and then restores the super-user groups IDs.
601  * This works at least on Linux, FreeBSD, and Solaris, but does not work
602  * on MacOSX, getgroups(2):
603  *
604  *   To provide compatibility with applications that use getgroups() in
605  *   environments where users may be in more than {NGROUPS_MAX} groups,
606  *   a variant of getgroups(), obtained when compiling with either the
607  *   macros _DARWIN_UNLIMITED_GETGROUPS or _DARWIN_C_SOURCE defined, can
608  *   be used that is not limited to {NGROUPS_MAX} groups.  However, this
609  *   variant only returns the user's default group access list and not
610  *   the group list modified by a call to setgroups(2).
611  *
612  * For such cases initgroups() is used in worker process as fallback.
613  */
614 
615 static nxt_int_t
616 nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc)
617 {
618     int        nsaved, ngroups;
619     nxt_int_t  ret;
620     nxt_gid_t  *saved;
621 
622     nsaved = getgroups(0, NULL);
623 
624     if (nsaved == -1) {
625         nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno);
626         return NXT_ERROR;
627     }
628 
629     nxt_debug(task, "getgroups(0, NULL): %d", nsaved);
630 
631     if (nsaved > NGROUPS_MAX) {
632         /* MacOSX case. */
633 
634         uc->gids = NULL;
635         uc->ngroups = 0;
636 
637         return NXT_OK;
638     }
639 
640     saved = nxt_malloc(nsaved * sizeof(nxt_gid_t));
641 
642     if (saved == NULL) {
643         return NXT_ERROR;
644     }
645 
646     ret = NXT_ERROR;
647 
648     nsaved = getgroups(nsaved, saved);
649 
650     if (nsaved == -1) {
651         nxt_alert(task, "getgroups(%d) failed %E", nsaved, nxt_errno);
652         goto free;
653     }
654 
655     nxt_debug(task, "getgroups(): %d", nsaved);
656 
657     if (initgroups(uc->user, uc->base_gid) != 0) {
658         if (nxt_errno == NXT_EPERM) {
659             nxt_log(task, NXT_LOG_NOTICE,
660                     "initgroups(%s, %d) failed %E, ignored",
661                     uc->user, uc->base_gid, nxt_errno);
662 
663             ret = NXT_OK;
664 
665             goto free;
666 
667         } else {
668             nxt_alert(task, "initgroups(%s, %d) failed %E",
669                       uc->user, uc->base_gid, nxt_errno);
670             goto restore;
671         }
672     }
673 
674     ngroups = getgroups(0, NULL);
675 
676     if (ngroups == -1) {
677         nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno);
678         goto restore;
679     }
680 
681     nxt_debug(task, "getgroups(0, NULL): %d", ngroups);
682 
683     uc->gids = nxt_malloc(ngroups * sizeof(nxt_gid_t));
684 
685     if (uc->gids == NULL) {
686         goto restore;
687     }
688 
689     ngroups = getgroups(ngroups, uc->gids);
690 
691     if (ngroups == -1) {
692         nxt_alert(task, "getgroups(%d) failed %E", ngroups, nxt_errno);
693         goto restore;
694     }
695 
696     uc->ngroups = ngroups;
697 
698 #if (NXT_DEBUG)
699     {
700         u_char      *p, *end;
701         nxt_uint_t  i;
702         u_char      msg[NXT_MAX_ERROR_STR];
703 
704         p = msg;
705         end = msg + NXT_MAX_ERROR_STR;
706 
707         for (i = 0; i < uc->ngroups; i++) {
708             p = nxt_sprintf(p, end, "%uL:", (uint64_t) uc->gids[i]);
709         }
710 
711         nxt_debug(task, "user \"%s\" cred: uid:%uL base gid:%uL, gids:%*s",
712                   uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid,
713                   p - msg, msg);
714     }
715 #endif
716 
717     ret = NXT_OK;
718 
719 restore:
720 
721     if (setgroups(nsaved, saved) != 0) {
722         nxt_alert(task, "setgroups(%d) failed %E", nsaved, nxt_errno);
723         ret = NXT_ERROR;
724     }
725 
726 free:
727 
728     nxt_free(saved);
729 
730     return ret;
731 }
732 
733 
734 nxt_int_t
735 nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
736 {
737     nxt_debug(task, "user cred set: \"%s\" uid:%uL base gid:%uL",
738               uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid);
739 
740     if (setgid(uc->base_gid) != 0) {
741         nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno);
742         return NXT_ERROR;
743     }
744 
745     if (uc->gids != NULL) {
746         if (setgroups(uc->ngroups, uc->gids) != 0) {
747             nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno);
748             return NXT_ERROR;
749         }
750 
751     } else {
752         /* MacOSX fallback. */
753         if (initgroups(uc->user, uc->base_gid) != 0) {
754             nxt_alert(task, "initgroups(%s, %d) failed %E",
755                       uc->user, uc->base_gid, nxt_errno);
756             return NXT_ERROR;
757         }
758     }
759 
760     if (setuid(uc->uid) != 0) {
761         nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno);
762         return NXT_ERROR;
763     }
764 
765     return NXT_OK;
766 }
767 
768 
769 void
770 nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
771 {
772     nxt_assert(port->process == NULL);
773 
774     port->process = process;
775     nxt_queue_insert_tail(&process->ports, &port->link);
776 
777     nxt_process_use(task, process, 1);
778 }
779 
780 
781 nxt_process_type_t
782 nxt_process_type(nxt_process_t *process)
783 {
784     return nxt_queue_is_empty(&process->ports) ? 0 :
785         (nxt_process_port_first(process))->type;
786 }
787 
788 
789 void
790 nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
791 {
792     nxt_port_t  *port;
793 
794     nxt_process_port_each(process, port) {
795 
796         nxt_port_close(task, port);
797 
798         nxt_runtime_port_remove(task, port);
799 
800     } nxt_process_port_loop;
801 }
802 
803 
804 void
805 nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
806 {
807     nxt_thread_mutex_lock(&process->cp_mutex);
808 
809     nxt_port_hash_add(&process->connected_ports, port);
810 
811     nxt_thread_mutex_unlock(&process->cp_mutex);
812 }
813 
814 
815 void
816 nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
817 {
818     nxt_thread_mutex_lock(&process->cp_mutex);
819 
820     nxt_port_hash_remove(&process->connected_ports, port);
821 
822     nxt_thread_mutex_unlock(&process->cp_mutex);
823 }
824 
825 
826 nxt_port_t *
827 nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid,
828     nxt_port_id_t port_id)
829 {
830     nxt_port_t  *res;
831 
832     nxt_thread_mutex_lock(&process->cp_mutex);
833 
834     res = nxt_port_hash_find(&process->connected_ports, pid, port_id);
835 
836     nxt_thread_mutex_unlock(&process->cp_mutex);
837 
838     return res;
839 }
840