xref: /unit/src/nxt_port_socket.c (revision 1907:75ddb2d89b42)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_port_queue.h>
9 #include <nxt_port_memory_int.h>
10 
11 
12 #define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \
13           (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))
14 
15 
16 static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b);
17 static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm,
18     void *qbuf, nxt_buf_t *b);
19 static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
20     nxt_port_send_msg_t *msg);
21 static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
22 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
23 static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
24 static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
25     nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
26 static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
27     nxt_port_send_msg_t *msg);
28 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
29 static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj,
30     void *data);
31 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
32     nxt_port_recv_msg_t *msg);
33 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
34 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
35 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
36 
37 
38 nxt_int_t
39 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
40 {
41     nxt_int_t     sndbuf, rcvbuf, size;
42     nxt_socket_t  snd, rcv;
43 
44     port->socket.task = task;
45 
46     port->pair[0] = -1;
47     port->pair[1] = -1;
48 
49     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
50         goto socketpair_fail;
51     }
52 
53     snd = port->pair[1];
54 
55     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
56     if (nxt_slow_path(sndbuf < 0)) {
57         goto getsockopt_fail;
58     }
59 
60     rcv = port->pair[0];
61 
62     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
63     if (nxt_slow_path(rcvbuf < 0)) {
64         goto getsockopt_fail;
65     }
66 
67     if (max_size == 0) {
68         max_size = 16 * 1024;
69     }
70 
71     if ((size_t) sndbuf < max_size) {
72         /*
73          * On Unix domain sockets
74          *   Linux uses 224K on both send and receive directions;
75          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
76          *   on send direction and 4K buffer size on receive direction;
77          *   Solaris uses 16K on send direction and 5K on receive direction.
78          */
79         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
80                                      max_size);
81 
82         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
83         if (nxt_slow_path(sndbuf < 0)) {
84             goto getsockopt_fail;
85         }
86 
87         size = sndbuf * 4;
88 
89         if (rcvbuf < size) {
90             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
91                                          size);
92 
93             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
94             if (nxt_slow_path(rcvbuf < 0)) {
95                 goto getsockopt_fail;
96             }
97         }
98     }
99 
100     port->max_size = nxt_min(max_size, (size_t) sndbuf);
101     port->max_share = (64 * 1024);
102 
103     return NXT_OK;
104 
105 getsockopt_fail:
106 
107     nxt_socket_close(task, port->pair[0]);
108     nxt_socket_close(task, port->pair[1]);
109 
110 socketpair_fail:
111 
112     return NXT_ERROR;
113 }
114 
115 
116 void
117 nxt_port_destroy(nxt_port_t *port)
118 {
119     nxt_socket_close(port->socket.task, port->socket.fd);
120     nxt_mp_destroy(port->mem_pool);
121 }
122 
123 
124 void
125 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
126 {
127     port->socket.fd = port->pair[1];
128     port->socket.log = &nxt_main_log;
129     port->socket.write_ready = 1;
130 
131     port->engine = task->thread->engine;
132 
133     port->socket.write_work_queue = &port->engine->fast_work_queue;
134     port->socket.write_handler = nxt_port_write_handler;
135     port->socket.error_handler = nxt_port_error_handler;
136 }
137 
138 
139 void
140 nxt_port_write_close(nxt_port_t *port)
141 {
142     nxt_socket_close(port->socket.task, port->pair[1]);
143     port->pair[1] = -1;
144 }
145 
146 
147 static void
148 nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
149 {
150     if (msg->allocated) {
151         nxt_free(msg);
152     }
153 }
154 
155 
156 nxt_int_t
157 nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
158     nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port,
159     nxt_buf_t *b)
160 {
161     int                  notify;
162     uint8_t              qmsg_size;
163     nxt_int_t            res;
164     nxt_port_send_msg_t  msg;
165     struct {
166         nxt_port_msg_t   pm;
167         uint8_t          buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE];
168     } qmsg;
169 
170     msg.link.next = NULL;
171     msg.link.prev = NULL;
172 
173     msg.buf = b;
174     msg.share = 0;
175     msg.fd[0] = fd;
176     msg.fd[1] = fd2;
177     msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
178     msg.allocated = 0;
179 
180     msg.port_msg.stream = stream;
181     msg.port_msg.pid = nxt_pid;
182     msg.port_msg.reply_port = reply_port;
183     msg.port_msg.type = type & NXT_PORT_MSG_MASK;
184     msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
185     msg.port_msg.mmap = 0;
186     msg.port_msg.nf = 0;
187     msg.port_msg.mf = 0;
188 
189     if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
190 
191         if (fd == -1 && nxt_port_can_enqueue_buf(b)) {
192             qmsg.pm = msg.port_msg;
193 
194             qmsg_size = sizeof(qmsg.pm);
195 
196             if (b != NULL) {
197                 qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b);
198             }
199 
200             res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, &notify);
201 
202             nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
203                       (int) port->pid, (int) port->id, port->socket.fd,
204                       (int) qmsg_size, notify, res);
205 
206             if (b != NULL && nxt_fast_path(res == NXT_OK)) {
207                 if (qmsg.pm.mmap) {
208                     b->is_port_mmap_sent = 1;
209                 }
210 
211                 b->mem.pos = b->mem.free;
212 
213                 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
214                                    b->completion_handler, task, b, b->parent);
215             }
216 
217             if (notify == 0) {
218                 return res;
219             }
220 
221             msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE;
222             msg.buf = NULL;
223 
224         } else {
225             qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET;
226 
227             res = nxt_port_queue_send(port->queue, qmsg.buf, 1, &notify);
228 
229             nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
230                       (int) port->pid, (int) port->id, port->socket.fd,
231                       notify, res);
232 
233             if (nxt_slow_path(res == NXT_AGAIN)) {
234                 return NXT_AGAIN;
235             }
236         }
237     }
238 
239     res = nxt_port_msg_chk_insert(task, port, &msg);
240     if (nxt_fast_path(res == NXT_DECLINED)) {
241         nxt_port_write_handler(task, &port->socket, &msg);
242         res = NXT_OK;
243     }
244 
245     return res;
246 }
247 
248 
249 static nxt_bool_t
250 nxt_port_can_enqueue_buf(nxt_buf_t *b)
251 {
252     if (b == NULL) {
253         return 1;
254     }
255 
256     if (b->next != NULL) {
257         return 0;
258     }
259 
260     return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE
261             || nxt_buf_is_port_mmap(b));
262 }
263 
264 
265 static uint8_t
266 nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf,
267     nxt_buf_t *b)
268 {
269     ssize_t                  size;
270     nxt_port_mmap_msg_t      *mm;
271     nxt_port_mmap_header_t   *hdr;
272     nxt_port_mmap_handler_t  *mmap_handler;
273 
274     size = nxt_buf_mem_used_size(&b->mem);
275 
276     if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) {
277         nxt_memcpy(qbuf, b->mem.pos, size);
278 
279         return size;
280     }
281 
282     mmap_handler = b->parent;
283     hdr = mmap_handler->hdr;
284     mm = qbuf;
285 
286     mm->mmap_id = hdr->id;
287     mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos);
288     mm->size = nxt_buf_mem_used_size(&b->mem);
289 
290     pm->mmap = 1;
291 
292     nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id,
293               mm->size);
294 
295     return sizeof(nxt_port_mmap_msg_t);
296 }
297 
298 
299 static nxt_int_t
300 nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
301     nxt_port_send_msg_t *msg)
302 {
303     nxt_int_t  res;
304 
305     nxt_thread_mutex_lock(&port->write_mutex);
306 
307     if (nxt_fast_path(port->socket.write_ready
308                       && nxt_queue_is_empty(&port->messages)))
309     {
310         res = NXT_DECLINED;
311 
312     } else {
313         msg = nxt_port_msg_alloc(msg);
314 
315         if (nxt_fast_path(msg != NULL)) {
316             nxt_queue_insert_tail(&port->messages, &msg->link);
317             nxt_port_use(task, port, 1);
318             res = NXT_OK;
319 
320         } else {
321             res = NXT_ERROR;
322         }
323     }
324 
325     nxt_thread_mutex_unlock(&port->write_mutex);
326 
327     return res;
328 }
329 
330 
331 static nxt_port_send_msg_t *
332 nxt_port_msg_alloc(nxt_port_send_msg_t *m)
333 {
334     nxt_port_send_msg_t  *msg;
335 
336     msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
337     if (nxt_slow_path(msg == NULL)) {
338         return NULL;
339     }
340 
341     *msg = *m;
342 
343     msg->allocated = 1;
344 
345     return msg;
346 }
347 
348 
349 static void
350 nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
351 {
352     nxt_fd_event_block_write(task->thread->engine, &port->socket);
353 }
354 
355 
356 static void
357 nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
358 {
359     nxt_fd_event_enable_write(task->thread->engine, &port->socket);
360 }
361 
362 
363 static void
364 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
365 {
366     int                     use_delta;
367     size_t                  plain_size;
368     ssize_t                 n;
369     uint32_t                mmsg_buf[3 * NXT_IOBUF_MAX * 10];
370     nxt_bool_t              block_write, enable_write;
371     nxt_port_t              *port;
372     struct iovec            iov[NXT_IOBUF_MAX * 10];
373     nxt_work_queue_t        *wq;
374     nxt_port_method_t       m;
375     nxt_port_send_msg_t     *msg;
376     nxt_sendbuf_coalesce_t  sb;
377 
378     port = nxt_container_of(obj, nxt_port_t, socket);
379 
380     block_write = 0;
381     enable_write = 0;
382     use_delta = 0;
383 
384     wq = &task->thread->engine->fast_work_queue;
385 
386     do {
387         if (data) {
388             msg = data;
389 
390         } else {
391             msg = nxt_port_msg_first(port);
392 
393             if (msg == NULL) {
394                 block_write = 1;
395                 goto cleanup;
396             }
397         }
398 
399 next_fragment:
400 
401         iov[0].iov_base = &msg->port_msg;
402         iov[0].iov_len = sizeof(nxt_port_msg_t);
403 
404         sb.buf = msg->buf;
405         sb.iobuf = &iov[1];
406         sb.nmax = NXT_IOBUF_MAX - 1;
407         sb.sync = 0;
408         sb.last = 0;
409         sb.size = 0;
410         sb.limit = port->max_size;
411 
412         sb.limit_reached = 0;
413         sb.nmax_reached = 0;
414 
415         m = nxt_port_mmap_get_method(task, port, msg->buf);
416 
417         if (m == NXT_PORT_METHOD_MMAP) {
418             sb.limit = (1ULL << 31) - 1;
419             sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
420                               port->max_size / PORT_MMAP_MIN_SIZE);
421         }
422 
423         sb.limit -= iov[0].iov_len;
424 
425         nxt_sendbuf_mem_coalesce(task, &sb);
426 
427         plain_size = sb.size;
428 
429         /*
430          * Send through mmap enabled only when payload
431          * is bigger than PORT_MMAP_MIN_SIZE.
432          */
433         if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
434             nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
435 
436         } else {
437             m = NXT_PORT_METHOD_PLAIN;
438         }
439 
440         msg->port_msg.last |= sb.last;
441         msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
442 
443         n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
444 
445         if (n > 0) {
446             if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
447                 nxt_alert(task, "port %d: short write: %z instead of %uz",
448                           port->socket.fd, n, sb.size + iov[0].iov_len);
449                 goto fail;
450             }
451 
452             if (msg->close_fd) {
453                 if (msg->fd[0] != -1) {
454                     nxt_fd_close(msg->fd[0]);
455 
456                     msg->fd[0] = -1;
457                 }
458 
459                 if (msg->fd[1] != -1) {
460                     nxt_fd_close(msg->fd[1]);
461 
462                     msg->fd[1] = -1;
463                 }
464             }
465 
466             msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
467                                                m == NXT_PORT_METHOD_MMAP);
468 
469             if (msg->buf != NULL) {
470                 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
471                           msg->port_msg.stream);
472 
473                 /*
474                  * A file descriptor is sent only
475                  * in the first message of a stream.
476                  */
477                 msg->fd[0] = -1;
478                 msg->fd[1] = -1;
479                 msg->share += n;
480                 msg->port_msg.nf = 1;
481 
482                 if (msg->share >= port->max_share) {
483                     msg->share = 0;
484 
485                     if (msg->link.next != NULL) {
486                         nxt_thread_mutex_lock(&port->write_mutex);
487 
488                         nxt_queue_remove(&msg->link);
489                         nxt_queue_insert_tail(&port->messages, &msg->link);
490 
491                         nxt_thread_mutex_unlock(&port->write_mutex);
492 
493                     } else {
494                         msg = nxt_port_msg_insert_tail(port, msg);
495                         if (nxt_slow_path(msg == NULL)) {
496                             goto fail;
497                         }
498 
499                         use_delta++;
500                     }
501 
502                 } else {
503                     goto next_fragment;
504                 }
505 
506             } else {
507                 if (msg->link.next != NULL) {
508                     nxt_thread_mutex_lock(&port->write_mutex);
509 
510                     nxt_queue_remove(&msg->link);
511                     msg->link.next = NULL;
512 
513                     nxt_thread_mutex_unlock(&port->write_mutex);
514 
515                     use_delta--;
516                 }
517 
518                 nxt_port_release_send_msg(msg);
519             }
520 
521             if (data != NULL) {
522                 goto cleanup;
523             }
524 
525         } else {
526             if (nxt_slow_path(n == NXT_ERROR)) {
527                 if (msg->link.next == NULL) {
528                     if (msg->close_fd) {
529                         if (msg->fd[0] != -1) {
530                             nxt_fd_close(msg->fd[0]);
531 
532                             msg->fd[0] = -1;
533                         }
534 
535                         if (msg->fd[1] != -1) {
536                             nxt_fd_close(msg->fd[1]);
537 
538                             msg->fd[1] = -1;
539                         }
540                     }
541 
542                     nxt_port_release_send_msg(msg);
543                 }
544 
545                 goto fail;
546             }
547 
548             if (msg->link.next == NULL) {
549                 msg = nxt_port_msg_insert_tail(port, msg);
550                 if (nxt_slow_path(msg == NULL)) {
551                     goto fail;
552                 }
553 
554                 use_delta++;
555             }
556         }
557 
558     } while (port->socket.write_ready);
559 
560     if (nxt_fd_event_is_disabled(port->socket.write)) {
561         enable_write = 1;
562     }
563 
564     goto cleanup;
565 
566 fail:
567 
568     use_delta++;
569 
570     nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
571                        &port->socket);
572 
573 cleanup:
574 
575     if (block_write && nxt_fd_event_is_active(port->socket.write)) {
576         nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
577     }
578 
579     if (enable_write) {
580         nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
581     }
582 
583     if (use_delta != 0) {
584         nxt_port_use(task, port, use_delta);
585     }
586 }
587 
588 
589 static nxt_port_send_msg_t *
590 nxt_port_msg_first(nxt_port_t *port)
591 {
592     nxt_queue_link_t     *lnk;
593     nxt_port_send_msg_t  *msg;
594 
595     nxt_thread_mutex_lock(&port->write_mutex);
596 
597     lnk = nxt_queue_first(&port->messages);
598 
599     if (lnk == nxt_queue_tail(&port->messages)) {
600         msg = NULL;
601 
602     } else {
603         msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
604     }
605 
606     nxt_thread_mutex_unlock(&port->write_mutex);
607 
608     return msg;
609 }
610 
611 
612 static nxt_buf_t *
613 nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
614     size_t sent, nxt_bool_t mmap_mode)
615 {
616     size_t     size;
617     nxt_buf_t  *next;
618 
619     while (b != NULL) {
620 
621         nxt_prefetch(b->next);
622 
623         if (!nxt_buf_is_sync(b)) {
624 
625             size = nxt_buf_used_size(b);
626 
627             if (size != 0) {
628 
629                 if (sent == 0) {
630                     break;
631                 }
632 
633                 if (nxt_buf_is_port_mmap(b) && mmap_mode) {
634                     /*
635                      * buffer has been sent to other side which is now
636                      * responsible for shared memory bucket release
637                      */
638                     b->is_port_mmap_sent = 1;
639                 }
640 
641                 if (sent < size) {
642 
643                     if (nxt_buf_is_mem(b)) {
644                         b->mem.pos += sent;
645                     }
646 
647                     if (nxt_buf_is_file(b)) {
648                         b->file_pos += sent;
649                     }
650 
651                     break;
652                 }
653 
654                 /* b->mem.free is NULL in file-only buffer. */
655                 b->mem.pos = b->mem.free;
656 
657                 if (nxt_buf_is_file(b)) {
658                     b->file_pos = b->file_end;
659                 }
660 
661                 sent -= size;
662             }
663         }
664 
665         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
666 
667         next = b->next;
668         b->next = NULL;
669         b = next;
670     }
671 
672     return b;
673 }
674 
675 
676 static nxt_port_send_msg_t *
677 nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
678 {
679     if (msg->allocated == 0) {
680         msg = nxt_port_msg_alloc(msg);
681 
682         if (nxt_slow_path(msg == NULL)) {
683             return NULL;
684         }
685     }
686 
687     nxt_thread_mutex_lock(&port->write_mutex);
688 
689     nxt_queue_insert_tail(&port->messages, &msg->link);
690 
691     nxt_thread_mutex_unlock(&port->write_mutex);
692 
693     return msg;
694 }
695 
696 
697 void
698 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
699 {
700     port->socket.fd = port->pair[0];
701     port->socket.log = &nxt_main_log;
702 
703     port->engine = task->thread->engine;
704 
705     port->socket.read_work_queue = &port->engine->fast_work_queue;
706     port->socket.read_handler = port->queue != NULL
707                                 ? nxt_port_queue_read_handler
708                                 : nxt_port_read_handler;
709     port->socket.error_handler = nxt_port_error_handler;
710 
711     nxt_fd_event_enable_read(port->engine, &port->socket);
712 }
713 
714 
715 void
716 nxt_port_read_close(nxt_port_t *port)
717 {
718     port->socket.read_ready = 0;
719     port->socket.read = NXT_EVENT_INACTIVE;
720     nxt_socket_close(port->socket.task, port->pair[0]);
721     port->pair[0] = -1;
722 }
723 
724 
725 static void
726 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
727 {
728     ssize_t              n;
729     nxt_buf_t            *b;
730     nxt_port_t           *port;
731     struct iovec         iov[2];
732     nxt_port_recv_msg_t  msg;
733 
734     port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
735 
736     nxt_assert(port->engine == task->thread->engine);
737 
738     for ( ;; ) {
739 
740         b = nxt_port_buf_alloc(port);
741 
742         if (nxt_slow_path(b == NULL)) {
743             /* TODO: disable event for some time */
744         }
745 
746         iov[0].iov_base = &msg.port_msg;
747         iov[0].iov_len = sizeof(nxt_port_msg_t);
748 
749         iov[1].iov_base = b->mem.pos;
750         iov[1].iov_len = port->max_size;
751 
752         n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2);
753 
754         if (n > 0) {
755 
756             msg.buf = b;
757             msg.size = n;
758 
759             nxt_port_read_msg_process(task, port, &msg);
760 
761             /*
762              * To disable instant completion or buffer re-usage,
763              * handler should reset 'msg.buf'.
764              */
765             if (msg.buf == b) {
766                 nxt_port_buf_free(port, b);
767             }
768 
769             if (port->socket.read_ready) {
770                 continue;
771             }
772 
773             return;
774         }
775 
776         if (n == NXT_AGAIN) {
777             nxt_port_buf_free(port, b);
778 
779             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
780             return;
781         }
782 
783         /* n == 0 || n == NXT_ERROR */
784 
785         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
786                            nxt_port_error_handler, task, &port->socket, NULL);
787         return;
788     }
789 }
790 
791 
792 static void
793 nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
794 {
795     ssize_t              n;
796     nxt_buf_t            *b;
797     nxt_port_t           *port;
798     struct iovec         iov[2];
799     nxt_port_queue_t     *queue;
800     nxt_port_recv_msg_t  msg, *smsg;
801     uint8_t              qmsg[NXT_PORT_QUEUE_MSG_SIZE];
802 
803     port = nxt_container_of(obj, nxt_port_t, socket);
804     msg.port = port;
805 
806     nxt_assert(port->engine == task->thread->engine);
807 
808     queue = port->queue;
809     nxt_atomic_fetch_add(&queue->nitems, 1);
810 
811     for ( ;; ) {
812 
813         if (port->from_socket == 0) {
814             n = nxt_port_queue_recv(queue, qmsg);
815 
816             if (n < 0 && !port->socket.read_ready) {
817                 nxt_atomic_fetch_add(&queue->nitems, -1);
818 
819                 n = nxt_port_queue_recv(queue, qmsg);
820                 if (n < 0) {
821                     return;
822                 }
823 
824                 nxt_atomic_fetch_add(&queue->nitems, 1);
825             }
826 
827             if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) {
828                 port->from_socket++;
829 
830                 nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d",
831                           (int) port->pid, (int) port->id, port->socket.fd,
832                           port->from_socket);
833 
834                 continue;
835             }
836 
837             nxt_debug(task, "port{%d,%d} %d: dequeue %d",
838                       (int) port->pid, (int) port->id, port->socket.fd,
839                       (int) n);
840 
841         } else {
842             if ((smsg = port->socket_msg) != NULL && smsg->size != 0) {
843                 msg.port_msg = smsg->port_msg;
844                 b = smsg->buf;
845                 n = smsg->size;
846                 msg.fd[0] = smsg->fd[0];
847                 msg.fd[1] = smsg->fd[1];
848 
849                 smsg->size = 0;
850 
851                 port->from_socket--;
852 
853                 nxt_debug(task, "port{%d,%d} %d: use suspended message %d",
854                           (int) port->pid, (int) port->id, port->socket.fd,
855                           (int) n);
856 
857                 goto process;
858             }
859 
860             n = -1;
861         }
862 
863         if (n < 0 && !port->socket.read_ready) {
864             nxt_atomic_fetch_add(&queue->nitems, -1);
865             return;
866         }
867 
868         b = nxt_port_buf_alloc(port);
869 
870         if (nxt_slow_path(b == NULL)) {
871             /* TODO: disable event for some time */
872         }
873 
874         if (n >= (ssize_t) sizeof(nxt_port_msg_t)) {
875             nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t));
876 
877             if (n > (ssize_t) sizeof(nxt_port_msg_t)) {
878                 nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t),
879                            n - sizeof(nxt_port_msg_t));
880             }
881 
882         } else {
883             iov[0].iov_base = &msg.port_msg;
884             iov[0].iov_len = sizeof(nxt_port_msg_t);
885 
886             iov[1].iov_base = b->mem.pos;
887             iov[1].iov_len = port->max_size;
888 
889             n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2);
890 
891             if (n == (ssize_t) sizeof(nxt_port_msg_t)
892                 && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
893             {
894                 nxt_port_buf_free(port, b);
895 
896                 nxt_debug(task, "port{%d,%d} %d: recv %d read_queue",
897                           (int) port->pid, (int) port->id, port->socket.fd,
898                           (int) n);
899 
900                 continue;
901             }
902 
903             nxt_debug(task, "port{%d,%d} %d: recvmsg %d",
904                       (int) port->pid, (int) port->id, port->socket.fd,
905                       (int) n);
906 
907             if (n > 0) {
908                 if (port->from_socket == 0) {
909                     nxt_debug(task, "port{%d,%d} %d: suspend message %d",
910                               (int) port->pid, (int) port->id, port->socket.fd,
911                               (int) n);
912 
913                     smsg = port->socket_msg;
914 
915                     if (nxt_slow_path(smsg == NULL)) {
916                         smsg = nxt_mp_alloc(port->mem_pool,
917                                             sizeof(nxt_port_recv_msg_t));
918 
919                         if (nxt_slow_path(smsg == NULL)) {
920                             nxt_alert(task, "port{%d,%d} %d: suspend message "
921                                             "failed",
922                                       (int) port->pid, (int) port->id,
923                                       port->socket.fd);
924 
925                             return;
926                         }
927 
928                         port->socket_msg = smsg;
929 
930                     } else {
931                         if (nxt_slow_path(smsg->size != 0)) {
932                             nxt_alert(task, "port{%d,%d} %d: too many suspend "
933                                             "messages",
934                                       (int) port->pid, (int) port->id,
935                                       port->socket.fd);
936 
937                             return;
938                         }
939                     }
940 
941                     smsg->port_msg = msg.port_msg;
942                     smsg->buf = b;
943                     smsg->size = n;
944                     smsg->fd[0] = msg.fd[0];
945                     smsg->fd[1] = msg.fd[1];
946 
947                     continue;
948                 }
949 
950                 port->from_socket--;
951             }
952         }
953 
954     process:
955 
956         if (n > 0) {
957             msg.buf = b;
958             msg.size = n;
959 
960             nxt_port_read_msg_process(task, port, &msg);
961 
962             /*
963              * To disable instant completion or buffer re-usage,
964              * handler should reset 'msg.buf'.
965              */
966             if (msg.buf == b) {
967                 nxt_port_buf_free(port, b);
968             }
969 
970             continue;
971         }
972 
973         if (n == NXT_AGAIN) {
974             nxt_port_buf_free(port, b);
975 
976             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
977 
978             continue;
979         }
980 
981         /* n == 0 || n == NXT_ERROR */
982 
983         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
984                            nxt_port_error_handler, task, &port->socket, NULL);
985         return;
986     }
987 }
988 
989 
990 typedef struct {
991     uint32_t  stream;
992     uint32_t  pid;
993 } nxt_port_frag_key_t;
994 
995 
996 static nxt_int_t
997 nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
998 {
999     nxt_port_recv_msg_t  *fmsg;
1000     nxt_port_frag_key_t  *frag_key;
1001 
1002     fmsg = data;
1003     frag_key = (nxt_port_frag_key_t *) lhq->key.start;
1004 
1005     if (lhq->key.length == sizeof(nxt_port_frag_key_t)
1006         && frag_key->stream == fmsg->port_msg.stream
1007         && frag_key->pid == (uint32_t) fmsg->port_msg.pid)
1008     {
1009         return NXT_OK;
1010     }
1011 
1012     return NXT_DECLINED;
1013 }
1014 
1015 
1016 static void *
1017 nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
1018 {
1019     return nxt_mp_align(ctx, size, size);
1020 }
1021 
1022 
1023 static void
1024 nxt_port_lvlhsh_frag_free(void *ctx, void *p)
1025 {
1026     nxt_mp_free(ctx, p);
1027 }
1028 
1029 
1030 static const nxt_lvlhsh_proto_t  lvlhsh_frag_proto  nxt_aligned(64) = {
1031     NXT_LVLHSH_DEFAULT,
1032     nxt_port_lvlhsh_frag_test,
1033     nxt_port_lvlhsh_frag_alloc,
1034     nxt_port_lvlhsh_frag_free,
1035 };
1036 
1037 
1038 static nxt_port_recv_msg_t *
1039 nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
1040     nxt_port_recv_msg_t *msg)
1041 {
1042     nxt_int_t            res;
1043     nxt_lvlhsh_query_t   lhq;
1044     nxt_port_recv_msg_t  *fmsg;
1045     nxt_port_frag_key_t  frag_key;
1046 
1047     nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
1048 
1049     fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
1050 
1051     if (nxt_slow_path(fmsg == NULL)) {
1052         return NULL;
1053     }
1054 
1055     *fmsg = *msg;
1056 
1057     frag_key.stream = fmsg->port_msg.stream;
1058     frag_key.pid = fmsg->port_msg.pid;
1059 
1060     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
1061     lhq.key.length = sizeof(nxt_port_frag_key_t);
1062     lhq.key.start = (u_char *) &frag_key;
1063     lhq.proto = &lvlhsh_frag_proto;
1064     lhq.replace = 0;
1065     lhq.value = fmsg;
1066     lhq.pool = port->mem_pool;
1067 
1068     res = nxt_lvlhsh_insert(&port->frags, &lhq);
1069 
1070     switch (res) {
1071 
1072     case NXT_OK:
1073         return fmsg;
1074 
1075     case NXT_DECLINED:
1076         nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
1077                 fmsg->port_msg.stream);
1078         nxt_mp_free(port->mem_pool, fmsg);
1079 
1080         return NULL;
1081 
1082     default:
1083         nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
1084                 fmsg->port_msg.stream);
1085 
1086         nxt_mp_free(port->mem_pool, fmsg);
1087 
1088         return NULL;
1089 
1090     }
1091 }
1092 
1093 
1094 static nxt_port_recv_msg_t *
1095 nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
1096 {
1097     nxt_int_t            res;
1098     nxt_bool_t           last;
1099     nxt_lvlhsh_query_t   lhq;
1100     nxt_port_frag_key_t  frag_key;
1101 
1102     last = msg->port_msg.mf == 0;
1103 
1104     nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
1105               msg->port_msg.stream);
1106 
1107     frag_key.stream = msg->port_msg.stream;
1108     frag_key.pid = msg->port_msg.pid;
1109 
1110     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
1111     lhq.key.length = sizeof(nxt_port_frag_key_t);
1112     lhq.key.start = (u_char *) &frag_key;
1113     lhq.proto = &lvlhsh_frag_proto;
1114     lhq.pool = port->mem_pool;
1115 
1116     res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
1117           nxt_lvlhsh_find(&port->frags, &lhq);
1118 
1119     switch (res) {
1120 
1121     case NXT_OK:
1122         return lhq.value;
1123 
1124     default:
1125         nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
1126                 frag_key.stream);
1127 
1128         return NULL;
1129     }
1130 }
1131 
1132 
1133 static void
1134 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
1135     nxt_port_recv_msg_t *msg)
1136 {
1137     nxt_buf_t            *b, *orig_b, *next;
1138     nxt_port_recv_msg_t  *fmsg;
1139 
1140     if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
1141         nxt_alert(task, "port %d: too small message:%uz",
1142                   port->socket.fd, msg->size);
1143 
1144         if (msg->fd[0] != -1) {
1145             nxt_fd_close(msg->fd[0]);
1146         }
1147 
1148         if (msg->fd[1] != -1) {
1149             nxt_fd_close(msg->fd[1]);
1150         }
1151 
1152         return;
1153     }
1154 
1155     /* adjust size to actual buffer used size */
1156     msg->size -= sizeof(nxt_port_msg_t);
1157 
1158     b = orig_b = msg->buf;
1159     b->mem.free += msg->size;
1160 
1161     msg->cancelled = 0;
1162 
1163     if (nxt_slow_path(msg->port_msg.nf != 0)) {
1164 
1165         fmsg = nxt_port_frag_find(task, port, msg);
1166 
1167         if (nxt_slow_path(fmsg == NULL)) {
1168             goto fmsg_failed;
1169         }
1170 
1171         if (nxt_fast_path(fmsg->cancelled == 0)) {
1172 
1173             if (msg->port_msg.mmap) {
1174                 nxt_port_mmap_read(task, msg);
1175             }
1176 
1177             nxt_buf_chain_add(&fmsg->buf, msg->buf);
1178 
1179             fmsg->size += msg->size;
1180             msg->buf = NULL;
1181             b = NULL;
1182 
1183             if (nxt_fast_path(msg->port_msg.mf == 0)) {
1184 
1185                 b = fmsg->buf;
1186 
1187                 port->handler(task, fmsg);
1188 
1189                 msg->buf = fmsg->buf;
1190                 msg->fd[0] = fmsg->fd[0];
1191                 msg->fd[1] = fmsg->fd[1];
1192 
1193                 /*
1194                  * To disable instant completion or buffer re-usage,
1195                  * handler should reset 'msg.buf'.
1196                  */
1197                 if (!msg->port_msg.mmap && msg->buf == b) {
1198                     nxt_port_buf_free(port, b);
1199                 }
1200             }
1201         }
1202 
1203         if (nxt_fast_path(msg->port_msg.mf == 0)) {
1204             nxt_mp_free(port->mem_pool, fmsg);
1205         }
1206     } else {
1207         if (nxt_slow_path(msg->port_msg.mf != 0)) {
1208 
1209             if (msg->port_msg.mmap && msg->cancelled == 0) {
1210                 nxt_port_mmap_read(task, msg);
1211                 b = msg->buf;
1212             }
1213 
1214             fmsg = nxt_port_frag_start(task, port, msg);
1215 
1216             if (nxt_slow_path(fmsg == NULL)) {
1217                 goto fmsg_failed;
1218             }
1219 
1220             fmsg->port_msg.nf = 0;
1221             fmsg->port_msg.mf = 0;
1222 
1223             if (nxt_fast_path(msg->cancelled == 0)) {
1224                 msg->buf = NULL;
1225                 msg->fd[0] = -1;
1226                 msg->fd[1] = -1;
1227                 b = NULL;
1228 
1229             } else {
1230                 if (msg->fd[0] != -1) {
1231                     nxt_fd_close(msg->fd[0]);
1232                 }
1233 
1234                 if (msg->fd[1] != -1) {
1235                     nxt_fd_close(msg->fd[1]);
1236                 }
1237             }
1238         } else {
1239             if (nxt_fast_path(msg->cancelled == 0)) {
1240 
1241                 if (msg->port_msg.mmap) {
1242                     nxt_port_mmap_read(task, msg);
1243                     b = msg->buf;
1244                 }
1245 
1246                 port->handler(task, msg);
1247             }
1248         }
1249     }
1250 
1251 fmsg_failed:
1252 
1253     if (msg->port_msg.mmap && orig_b != b) {
1254 
1255         /*
1256          * To disable instant buffer completion,
1257          * handler should reset 'msg->buf'.
1258          */
1259         if (msg->buf == b) {
1260             /* complete mmap buffers */
1261             while (b != NULL) {
1262                 nxt_debug(task, "complete buffer %p", b);
1263 
1264                 nxt_work_queue_add(port->socket.read_work_queue,
1265                     b->completion_handler, task, b, b->parent);
1266 
1267                 next = b->next;
1268                 b->next = NULL;
1269                 b = next;
1270             }
1271         }
1272 
1273         /* restore original buf */
1274         msg->buf = orig_b;
1275     }
1276 }
1277 
1278 
1279 static nxt_buf_t *
1280 nxt_port_buf_alloc(nxt_port_t *port)
1281 {
1282     nxt_buf_t  *b;
1283 
1284     if (port->free_bufs != NULL) {
1285         b = port->free_bufs;
1286         port->free_bufs = b->next;
1287 
1288         b->mem.pos = b->mem.start;
1289         b->mem.free = b->mem.start;
1290         b->next = NULL;
1291     } else {
1292         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
1293         if (nxt_slow_path(b == NULL)) {
1294             return NULL;
1295         }
1296     }
1297 
1298     return b;
1299 }
1300 
1301 
1302 static void
1303 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
1304 {
1305     nxt_buf_chain_add(&b, port->free_bufs);
1306     port->free_bufs = b;
1307 }
1308 
1309 
1310 static void
1311 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
1312 {
1313     int                  use_delta;
1314     nxt_buf_t            *b, *next;
1315     nxt_port_t           *port;
1316     nxt_work_queue_t     *wq;
1317     nxt_port_send_msg_t  *msg;
1318 
1319     nxt_debug(task, "port error handler %p", obj);
1320     /* TODO */
1321 
1322     port = nxt_container_of(obj, nxt_port_t, socket);
1323 
1324     use_delta = 0;
1325 
1326     if (obj == data) {
1327         use_delta--;
1328     }
1329 
1330     wq = &task->thread->engine->fast_work_queue;
1331 
1332     nxt_thread_mutex_lock(&port->write_mutex);
1333 
1334     nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
1335 
1336         if (msg->close_fd) {
1337             if (msg->fd[0] != -1) {
1338                 nxt_fd_close(msg->fd[0]);
1339 
1340                 msg->fd[0] = -1;
1341             }
1342 
1343             if (msg->fd[1] != -1) {
1344                 nxt_fd_close(msg->fd[1]);
1345 
1346                 msg->fd[1] = -1;
1347             }
1348         }
1349 
1350         for (b = msg->buf; b != NULL; b = next) {
1351             next = b->next;
1352             b->next = NULL;
1353 
1354             if (nxt_buf_is_sync(b)) {
1355                 continue;
1356             }
1357 
1358             nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
1359         }
1360 
1361         nxt_queue_remove(&msg->link);
1362         use_delta--;
1363 
1364         nxt_port_release_send_msg(msg);
1365 
1366     } nxt_queue_loop;
1367 
1368     nxt_thread_mutex_unlock(&port->write_mutex);
1369 
1370     if (use_delta != 0) {
1371         nxt_port_use(task, port, use_delta);
1372     }
1373 }
1374