xref: /unit/src/nxt_port_socket.c (revision 1553:c3fad601f58b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
11     nxt_port_send_msg_t *msg);
12 static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
13 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
14 static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
15 static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
16     nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
17 static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
18     nxt_port_send_msg_t *msg);
19 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
20 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
21     nxt_port_recv_msg_t *msg);
22 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
23 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
24 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
25 
26 
27 nxt_int_t
28 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
29 {
30     nxt_int_t     sndbuf, rcvbuf, size;
31     nxt_socket_t  snd, rcv;
32 
33     port->socket.task = task;
34 
35     port->pair[0] = -1;
36     port->pair[1] = -1;
37 
38     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
39         goto socketpair_fail;
40     }
41 
42     snd = port->pair[1];
43 
44     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
45     if (nxt_slow_path(sndbuf < 0)) {
46         goto getsockopt_fail;
47     }
48 
49     rcv = port->pair[0];
50 
51     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
52     if (nxt_slow_path(rcvbuf < 0)) {
53         goto getsockopt_fail;
54     }
55 
56     if (max_size == 0) {
57         max_size = 16 * 1024;
58     }
59 
60     if ((size_t) sndbuf < max_size) {
61         /*
62          * On Unix domain sockets
63          *   Linux uses 224K on both send and receive directions;
64          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
65          *   on send direction and 4K buffer size on receive direction;
66          *   Solaris uses 16K on send direction and 5K on receive direction.
67          */
68         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
69                                      max_size);
70 
71         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
72         if (nxt_slow_path(sndbuf < 0)) {
73             goto getsockopt_fail;
74         }
75 
76         size = sndbuf * 4;
77 
78         if (rcvbuf < size) {
79             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
80                                          size);
81 
82             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
83             if (nxt_slow_path(rcvbuf < 0)) {
84                 goto getsockopt_fail;
85             }
86         }
87     }
88 
89     port->max_size = nxt_min(max_size, (size_t) sndbuf);
90     port->max_share = (64 * 1024);
91 
92     return NXT_OK;
93 
94 getsockopt_fail:
95 
96     nxt_socket_close(task, port->pair[0]);
97     nxt_socket_close(task, port->pair[1]);
98 
99 socketpair_fail:
100 
101     return NXT_ERROR;
102 }
103 
104 
105 void
106 nxt_port_destroy(nxt_port_t *port)
107 {
108     nxt_socket_close(port->socket.task, port->socket.fd);
109     nxt_mp_destroy(port->mem_pool);
110 }
111 
112 
113 void
114 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
115 {
116     port->socket.fd = port->pair[1];
117     port->socket.log = &nxt_main_log;
118     port->socket.write_ready = 1;
119 
120     port->engine = task->thread->engine;
121 
122     port->socket.write_work_queue = &port->engine->fast_work_queue;
123     port->socket.write_handler = nxt_port_write_handler;
124     port->socket.error_handler = nxt_port_error_handler;
125 }
126 
127 
128 void
129 nxt_port_write_close(nxt_port_t *port)
130 {
131     nxt_socket_close(port->socket.task, port->pair[1]);
132     port->pair[1] = -1;
133 }
134 
135 
136 static void
137 nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
138 {
139     if (msg->allocated) {
140         nxt_free(msg);
141     }
142 }
143 
144 
145 nxt_int_t
146 nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
147     nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
148     void *tracking)
149 {
150     nxt_int_t            res;
151     nxt_port_send_msg_t  msg;
152 
153     msg.link.next = NULL;
154     msg.link.prev = NULL;
155 
156     msg.buf = b;
157     msg.share = 0;
158     msg.fd = fd;
159     msg.fd2 = -1;
160     msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
161     msg.allocated = 0;
162 
163     if (tracking != NULL) {
164         nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
165     }
166 
167     msg.port_msg.stream = stream;
168     msg.port_msg.pid = nxt_pid;
169     msg.port_msg.reply_port = reply_port;
170     msg.port_msg.type = type & NXT_PORT_MSG_MASK;
171     msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
172     msg.port_msg.mmap = 0;
173     msg.port_msg.nf = 0;
174     msg.port_msg.mf = 0;
175     msg.port_msg.tracking = tracking != NULL;
176 
177     res = nxt_port_msg_chk_insert(task, port, &msg);
178     if (nxt_fast_path(res == NXT_DECLINED)) {
179         nxt_port_write_handler(task, &port->socket, &msg);
180         res = NXT_OK;
181     }
182 
183     return res;
184 }
185 
186 
187 static nxt_int_t
188 nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
189     nxt_port_send_msg_t *msg)
190 {
191     nxt_int_t  res;
192 
193     nxt_thread_mutex_lock(&port->write_mutex);
194 
195     if (nxt_fast_path(port->socket.write_ready
196                       && nxt_queue_is_empty(&port->messages)))
197     {
198         res = NXT_DECLINED;
199 
200     } else {
201         msg = nxt_port_msg_alloc(msg);
202 
203         if (nxt_fast_path(msg != NULL)) {
204             nxt_queue_insert_tail(&port->messages, &msg->link);
205             nxt_port_use(task, port, 1);
206             res = NXT_OK;
207 
208         } else {
209             res = NXT_ERROR;
210         }
211     }
212 
213     nxt_thread_mutex_unlock(&port->write_mutex);
214 
215     return res;
216 }
217 
218 
219 static nxt_port_send_msg_t *
220 nxt_port_msg_alloc(nxt_port_send_msg_t *m)
221 {
222     nxt_port_send_msg_t  *msg;
223 
224     msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
225     if (nxt_slow_path(msg == NULL)) {
226         return NULL;
227     }
228 
229     *msg = *m;
230 
231     msg->allocated = 1;
232 
233     return msg;
234 }
235 
236 
237 static void
238 nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
239 {
240     nxt_fd_event_block_write(task->thread->engine, &port->socket);
241 }
242 
243 
244 static void
245 nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
246 {
247     nxt_fd_event_enable_write(task->thread->engine, &port->socket);
248 }
249 
250 
251 static void
252 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
253 {
254     int                     use_delta;
255     size_t                  plain_size;
256     ssize_t                 n;
257     uint32_t                mmsg_buf[3 * NXT_IOBUF_MAX * 10];
258     nxt_bool_t              block_write, enable_write;
259     nxt_port_t              *port;
260     struct iovec            iov[NXT_IOBUF_MAX * 10];
261     nxt_work_queue_t        *wq;
262     nxt_port_method_t       m;
263     nxt_port_send_msg_t     *msg;
264     nxt_sendbuf_coalesce_t  sb;
265 
266     port = nxt_container_of(obj, nxt_port_t, socket);
267 
268     block_write = 0;
269     enable_write = 0;
270     use_delta = 0;
271 
272     wq = &task->thread->engine->fast_work_queue;
273 
274     do {
275         if (data) {
276             msg = data;
277 
278         } else {
279             msg = nxt_port_msg_first(port);
280 
281             if (msg == NULL) {
282                 block_write = 1;
283                 goto cleanup;
284             }
285         }
286 
287 next_fragment:
288 
289         iov[0].iov_base = &msg->port_msg;
290         iov[0].iov_len = sizeof(nxt_port_msg_t);
291 
292         sb.buf = msg->buf;
293         sb.iobuf = &iov[1];
294         sb.nmax = NXT_IOBUF_MAX - 1;
295         sb.sync = 0;
296         sb.last = 0;
297         sb.size = 0;
298         sb.limit = port->max_size;
299 
300         sb.limit_reached = 0;
301         sb.nmax_reached = 0;
302 
303         m = nxt_port_mmap_get_method(task, port, msg->buf);
304 
305         if (m == NXT_PORT_METHOD_MMAP) {
306             sb.limit = (1ULL << 31) - 1;
307             sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
308                               port->max_size / PORT_MMAP_MIN_SIZE);
309         }
310 
311         if (msg->port_msg.tracking) {
312             iov[0].iov_len += sizeof(msg->tracking_msg);
313         }
314 
315         sb.limit -= iov[0].iov_len;
316 
317         nxt_sendbuf_mem_coalesce(task, &sb);
318 
319         plain_size = sb.size;
320 
321         /*
322          * Send through mmap enabled only when payload
323          * is bigger than PORT_MMAP_MIN_SIZE.
324          */
325         if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
326             nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
327 
328         } else {
329             m = NXT_PORT_METHOD_PLAIN;
330         }
331 
332         msg->port_msg.last |= sb.last;
333         msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
334 
335         n = nxt_socketpair_send(&port->socket, &msg->fd, iov, sb.niov + 1);
336 
337         if (n > 0) {
338             if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
339                 nxt_alert(task, "port %d: short write: %z instead of %uz",
340                           port->socket.fd, n, sb.size + iov[0].iov_len);
341                 goto fail;
342             }
343 
344             if (msg->fd != -1 && msg->close_fd != 0) {
345                 nxt_fd_close(msg->fd);
346 
347                 msg->fd = -1;
348             }
349 
350             if (msg->fd2 != -1 && msg->close_fd != 0) {
351                 nxt_fd_close(msg->fd2);
352 
353                 msg->fd2 = -1;
354             }
355 
356             msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
357                                                m == NXT_PORT_METHOD_MMAP);
358 
359             if (msg->buf != NULL) {
360                 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
361                           msg->port_msg.stream);
362 
363                 /*
364                  * A file descriptor is sent only
365                  * in the first message of a stream.
366                  */
367                 msg->fd = -1;
368                 msg->fd2 = -1;
369                 msg->share += n;
370                 msg->port_msg.nf = 1;
371                 msg->port_msg.tracking = 0;
372 
373                 if (msg->share >= port->max_share) {
374                     msg->share = 0;
375 
376                     if (msg->link.next != NULL) {
377                         nxt_thread_mutex_lock(&port->write_mutex);
378 
379                         nxt_queue_remove(&msg->link);
380                         nxt_queue_insert_tail(&port->messages, &msg->link);
381 
382                         nxt_thread_mutex_unlock(&port->write_mutex);
383 
384                     } else {
385                         msg = nxt_port_msg_insert_tail(port, msg);
386                         if (nxt_slow_path(msg == NULL)) {
387                             goto fail;
388                         }
389 
390                         use_delta++;
391                     }
392 
393                 } else {
394                     goto next_fragment;
395                 }
396 
397             } else {
398                 if (msg->link.next != NULL) {
399                     nxt_thread_mutex_lock(&port->write_mutex);
400 
401                     nxt_queue_remove(&msg->link);
402                     msg->link.next = NULL;
403 
404                     nxt_thread_mutex_unlock(&port->write_mutex);
405 
406                     use_delta--;
407                 }
408 
409                 nxt_port_release_send_msg(msg);
410             }
411 
412             if (data != NULL) {
413                 goto cleanup;
414             }
415 
416         } else {
417             if (nxt_slow_path(n == NXT_ERROR)) {
418                 goto fail;
419             }
420 
421             if (msg->link.next == NULL) {
422                 msg = nxt_port_msg_insert_tail(port, msg);
423                 if (nxt_slow_path(msg == NULL)) {
424                     goto fail;
425                 }
426 
427                 use_delta++;
428             }
429         }
430 
431     } while (port->socket.write_ready);
432 
433     if (nxt_fd_event_is_disabled(port->socket.write)) {
434         enable_write = 1;
435     }
436 
437     goto cleanup;
438 
439 fail:
440 
441     use_delta++;
442 
443     nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
444                        &port->socket);
445 
446 cleanup:
447 
448     if (block_write && nxt_fd_event_is_active(port->socket.write)) {
449         nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
450     }
451 
452     if (enable_write) {
453         nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
454     }
455 
456     if (use_delta != 0) {
457         nxt_port_use(task, port, use_delta);
458     }
459 }
460 
461 
462 static nxt_port_send_msg_t *
463 nxt_port_msg_first(nxt_port_t *port)
464 {
465     nxt_queue_link_t     *lnk;
466     nxt_port_send_msg_t  *msg;
467 
468     nxt_thread_mutex_lock(&port->write_mutex);
469 
470     lnk = nxt_queue_first(&port->messages);
471 
472     if (lnk == nxt_queue_tail(&port->messages)) {
473         msg = NULL;
474 
475     } else {
476         msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
477     }
478 
479     nxt_thread_mutex_unlock(&port->write_mutex);
480 
481     return msg;
482 }
483 
484 
485 static nxt_buf_t *
486 nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
487     size_t sent, nxt_bool_t mmap_mode)
488 {
489     size_t     size;
490     nxt_buf_t  *next;
491 
492     while (b != NULL) {
493 
494         nxt_prefetch(b->next);
495 
496         if (!nxt_buf_is_sync(b)) {
497 
498             size = nxt_buf_used_size(b);
499 
500             if (size != 0) {
501 
502                 if (sent == 0) {
503                     break;
504                 }
505 
506                 if (nxt_buf_is_port_mmap(b) && mmap_mode) {
507                     /*
508                      * buffer has been sent to other side which is now
509                      * responsible for shared memory bucket release
510                      */
511                     b->is_port_mmap_sent = 1;
512                 }
513 
514                 if (sent < size) {
515 
516                     if (nxt_buf_is_mem(b)) {
517                         b->mem.pos += sent;
518                     }
519 
520                     if (nxt_buf_is_file(b)) {
521                         b->file_pos += sent;
522                     }
523 
524                     break;
525                 }
526 
527                 /* b->mem.free is NULL in file-only buffer. */
528                 b->mem.pos = b->mem.free;
529 
530                 if (nxt_buf_is_file(b)) {
531                     b->file_pos = b->file_end;
532                 }
533 
534                 sent -= size;
535             }
536         }
537 
538         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
539 
540         next = b->next;
541         b->next = NULL;
542         b = next;
543     }
544 
545     return b;
546 }
547 
548 
549 static nxt_port_send_msg_t *
550 nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
551 {
552     if (msg->allocated == 0) {
553         msg = nxt_port_msg_alloc(msg);
554 
555         if (nxt_slow_path(msg == NULL)) {
556             return NULL;
557         }
558     }
559 
560     nxt_thread_mutex_lock(&port->write_mutex);
561 
562     nxt_queue_insert_tail(&port->messages, &msg->link);
563 
564     nxt_thread_mutex_unlock(&port->write_mutex);
565 
566     return msg;
567 }
568 
569 
570 void
571 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
572 {
573     port->socket.fd = port->pair[0];
574     port->socket.log = &nxt_main_log;
575 
576     port->engine = task->thread->engine;
577 
578     port->socket.read_work_queue = &port->engine->fast_work_queue;
579     port->socket.read_handler = nxt_port_read_handler;
580     port->socket.error_handler = nxt_port_error_handler;
581 
582     nxt_fd_event_enable_read(port->engine, &port->socket);
583 }
584 
585 
586 void
587 nxt_port_read_close(nxt_port_t *port)
588 {
589     port->socket.read_ready = 0;
590     port->socket.read = NXT_EVENT_INACTIVE;
591     nxt_socket_close(port->socket.task, port->pair[0]);
592     port->pair[0] = -1;
593 }
594 
595 
596 static void
597 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
598 {
599     ssize_t              n;
600     nxt_buf_t            *b;
601     nxt_port_t           *port;
602     struct iovec         iov[2];
603     nxt_port_recv_msg_t  msg;
604 
605     port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
606 
607     nxt_assert(port->engine == task->thread->engine);
608 
609     for ( ;; ) {
610 
611         b = nxt_port_buf_alloc(port);
612 
613         if (nxt_slow_path(b == NULL)) {
614             /* TODO: disable event for some time */
615         }
616 
617         iov[0].iov_base = &msg.port_msg;
618         iov[0].iov_len = sizeof(nxt_port_msg_t);
619 
620         iov[1].iov_base = b->mem.pos;
621         iov[1].iov_len = port->max_size;
622 
623         n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
624 
625         if (n > 0) {
626 
627             msg.buf = b;
628             msg.size = n;
629 
630             nxt_port_read_msg_process(task, port, &msg);
631 
632             /*
633              * To disable instant completion or buffer re-usage,
634              * handler should reset 'msg.buf'.
635              */
636             if (msg.buf == b) {
637                 nxt_port_buf_free(port, b);
638             }
639 
640             if (port->socket.read_ready) {
641                 continue;
642             }
643 
644             return;
645         }
646 
647         if (n == NXT_AGAIN) {
648             nxt_port_buf_free(port, b);
649 
650             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
651             return;
652         }
653 
654         /* n == 0 || n == NXT_ERROR */
655 
656         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
657                            nxt_port_error_handler, task, &port->socket, NULL);
658         return;
659     }
660 }
661 
662 
663 typedef struct {
664     uint32_t  stream;
665     uint32_t  pid;
666 } nxt_port_frag_key_t;
667 
668 
669 static nxt_int_t
670 nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
671 {
672     nxt_port_recv_msg_t  *fmsg;
673     nxt_port_frag_key_t  *frag_key;
674 
675     fmsg = data;
676     frag_key = (nxt_port_frag_key_t *) lhq->key.start;
677 
678     if (lhq->key.length == sizeof(nxt_port_frag_key_t)
679         && frag_key->stream == fmsg->port_msg.stream
680         && frag_key->pid == (uint32_t) fmsg->port_msg.pid)
681     {
682         return NXT_OK;
683     }
684 
685     return NXT_DECLINED;
686 }
687 
688 
689 static void *
690 nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
691 {
692     return nxt_mp_align(ctx, size, size);
693 }
694 
695 
696 static void
697 nxt_port_lvlhsh_frag_free(void *ctx, void *p)
698 {
699     nxt_mp_free(ctx, p);
700 }
701 
702 
703 static const nxt_lvlhsh_proto_t  lvlhsh_frag_proto  nxt_aligned(64) = {
704     NXT_LVLHSH_DEFAULT,
705     nxt_port_lvlhsh_frag_test,
706     nxt_port_lvlhsh_frag_alloc,
707     nxt_port_lvlhsh_frag_free,
708 };
709 
710 
711 static nxt_port_recv_msg_t *
712 nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
713     nxt_port_recv_msg_t *msg)
714 {
715     nxt_int_t            res;
716     nxt_lvlhsh_query_t   lhq;
717     nxt_port_recv_msg_t  *fmsg;
718     nxt_port_frag_key_t  frag_key;
719 
720     nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
721 
722     fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
723 
724     if (nxt_slow_path(fmsg == NULL)) {
725         return NULL;
726     }
727 
728     *fmsg = *msg;
729 
730     frag_key.stream = fmsg->port_msg.stream;
731     frag_key.pid = fmsg->port_msg.pid;
732 
733     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
734     lhq.key.length = sizeof(nxt_port_frag_key_t);
735     lhq.key.start = (u_char *) &frag_key;
736     lhq.proto = &lvlhsh_frag_proto;
737     lhq.replace = 0;
738     lhq.value = fmsg;
739     lhq.pool = port->mem_pool;
740 
741     res = nxt_lvlhsh_insert(&port->frags, &lhq);
742 
743     switch (res) {
744 
745     case NXT_OK:
746         return fmsg;
747 
748     case NXT_DECLINED:
749         nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
750                 fmsg->port_msg.stream);
751         nxt_mp_free(port->mem_pool, fmsg);
752 
753         return NULL;
754 
755     default:
756         nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
757                 fmsg->port_msg.stream);
758 
759         nxt_mp_free(port->mem_pool, fmsg);
760 
761         return NULL;
762 
763     }
764 }
765 
766 
767 static nxt_port_recv_msg_t *
768 nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
769 {
770     nxt_int_t            res;
771     nxt_bool_t           last;
772     nxt_lvlhsh_query_t   lhq;
773     nxt_port_frag_key_t  frag_key;
774 
775     last = msg->port_msg.mf == 0;
776 
777     nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
778               msg->port_msg.stream);
779 
780     frag_key.stream = msg->port_msg.stream;
781     frag_key.pid = msg->port_msg.pid;
782 
783     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
784     lhq.key.length = sizeof(nxt_port_frag_key_t);
785     lhq.key.start = (u_char *) &frag_key;
786     lhq.proto = &lvlhsh_frag_proto;
787     lhq.pool = port->mem_pool;
788 
789     res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
790           nxt_lvlhsh_find(&port->frags, &lhq);
791 
792     switch (res) {
793 
794     case NXT_OK:
795         return lhq.value;
796 
797     default:
798         nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
799                 frag_key.stream);
800 
801         return NULL;
802     }
803 }
804 
805 
806 static void
807 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
808     nxt_port_recv_msg_t *msg)
809 {
810     nxt_buf_t            *b, *orig_b, *next;
811     nxt_port_recv_msg_t  *fmsg;
812 
813     if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
814         nxt_alert(task, "port %d: too small message:%uz",
815                   port->socket.fd, msg->size);
816 
817         if (msg->fd != -1) {
818             nxt_fd_close(msg->fd);
819         }
820 
821         if (msg->fd2 != -1) {
822             nxt_fd_close(msg->fd2);
823         }
824 
825         return;
826     }
827 
828     /* adjust size to actual buffer used size */
829     msg->size -= sizeof(nxt_port_msg_t);
830 
831     b = orig_b = msg->buf;
832     b->mem.free += msg->size;
833 
834     if (msg->port_msg.tracking) {
835         msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
836 
837     } else {
838         msg->cancelled = 0;
839     }
840 
841     if (nxt_slow_path(msg->port_msg.nf != 0)) {
842 
843         fmsg = nxt_port_frag_find(task, port, msg);
844 
845         if (nxt_slow_path(fmsg == NULL)) {
846             goto fmsg_failed;
847         }
848 
849         if (nxt_fast_path(fmsg->cancelled == 0)) {
850 
851             if (msg->port_msg.mmap) {
852                 nxt_port_mmap_read(task, msg);
853             }
854 
855             nxt_buf_chain_add(&fmsg->buf, msg->buf);
856 
857             fmsg->size += msg->size;
858             msg->buf = NULL;
859             b = NULL;
860 
861             if (nxt_fast_path(msg->port_msg.mf == 0)) {
862 
863                 b = fmsg->buf;
864 
865                 port->handler(task, fmsg);
866 
867                 msg->buf = fmsg->buf;
868                 msg->fd = fmsg->fd;
869                 msg->fd2 = fmsg->fd2;
870 
871                 /*
872                  * To disable instant completion or buffer re-usage,
873                  * handler should reset 'msg.buf'.
874                  */
875                 if (!msg->port_msg.mmap && msg->buf == b) {
876                     nxt_port_buf_free(port, b);
877                 }
878             }
879         }
880 
881         if (nxt_fast_path(msg->port_msg.mf == 0)) {
882             nxt_mp_free(port->mem_pool, fmsg);
883         }
884     } else {
885         if (nxt_slow_path(msg->port_msg.mf != 0)) {
886 
887             if (msg->port_msg.mmap && msg->cancelled == 0) {
888                 nxt_port_mmap_read(task, msg);
889                 b = msg->buf;
890             }
891 
892             fmsg = nxt_port_frag_start(task, port, msg);
893 
894             if (nxt_slow_path(fmsg == NULL)) {
895                 goto fmsg_failed;
896             }
897 
898             fmsg->port_msg.nf = 0;
899             fmsg->port_msg.mf = 0;
900 
901             if (nxt_fast_path(msg->cancelled == 0)) {
902                 msg->buf = NULL;
903                 msg->fd = -1;
904                 msg->fd2 = -1;
905                 b = NULL;
906 
907             } else {
908                 if (msg->fd != -1) {
909                     nxt_fd_close(msg->fd);
910                 }
911 
912                 if (msg->fd2 != -1) {
913                     nxt_fd_close(msg->fd2);
914                 }
915             }
916         } else {
917             if (nxt_fast_path(msg->cancelled == 0)) {
918 
919                 if (msg->port_msg.mmap) {
920                     nxt_port_mmap_read(task, msg);
921                     b = msg->buf;
922                 }
923 
924                 port->handler(task, msg);
925             }
926         }
927     }
928 
929 fmsg_failed:
930 
931     if (msg->port_msg.mmap && orig_b != b) {
932 
933         /*
934          * To disable instant buffer completion,
935          * handler should reset 'msg->buf'.
936          */
937         if (msg->buf == b) {
938             /* complete mmap buffers */
939             while (b != NULL) {
940                 nxt_debug(task, "complete buffer %p", b);
941 
942                 nxt_work_queue_add(port->socket.read_work_queue,
943                     b->completion_handler, task, b, b->parent);
944 
945                 next = b->next;
946                 b->next = NULL;
947                 b = next;
948             }
949         }
950 
951         /* restore original buf */
952         msg->buf = orig_b;
953     }
954 }
955 
956 
957 static nxt_buf_t *
958 nxt_port_buf_alloc(nxt_port_t *port)
959 {
960     nxt_buf_t  *b;
961 
962     if (port->free_bufs != NULL) {
963         b = port->free_bufs;
964         port->free_bufs = b->next;
965 
966         b->mem.pos = b->mem.start;
967         b->mem.free = b->mem.start;
968         b->next = NULL;
969     } else {
970         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
971         if (nxt_slow_path(b == NULL)) {
972             return NULL;
973         }
974     }
975 
976     return b;
977 }
978 
979 
980 static void
981 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
982 {
983     nxt_buf_chain_add(&b, port->free_bufs);
984     port->free_bufs = b;
985 }
986 
987 
988 static void
989 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
990 {
991     int                  use_delta;
992     nxt_buf_t            *b, *next;
993     nxt_port_t           *port;
994     nxt_work_queue_t     *wq;
995     nxt_port_send_msg_t  *msg;
996 
997     nxt_debug(task, "port error handler %p", obj);
998     /* TODO */
999 
1000     port = nxt_container_of(obj, nxt_port_t, socket);
1001 
1002     use_delta = 0;
1003 
1004     if (obj == data) {
1005         use_delta--;
1006     }
1007 
1008     wq = &task->thread->engine->fast_work_queue;
1009 
1010     nxt_thread_mutex_lock(&port->write_mutex);
1011 
1012     nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
1013 
1014         if (msg->fd != -1 && msg->close_fd != 0) {
1015             nxt_fd_close(msg->fd);
1016 
1017             msg->fd = -1;
1018         }
1019 
1020         if (msg->fd2 != -1 && msg->close_fd != 0) {
1021             nxt_fd_close(msg->fd2);
1022 
1023             msg->fd2 = -1;
1024         }
1025 
1026         for (b = msg->buf; b != NULL; b = next) {
1027             next = b->next;
1028             b->next = NULL;
1029 
1030             if (nxt_buf_is_sync(b)) {
1031                 continue;
1032             }
1033 
1034             nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
1035         }
1036 
1037         nxt_queue_remove(&msg->link);
1038         use_delta--;
1039 
1040         nxt_port_release_send_msg(msg);
1041 
1042     } nxt_queue_loop;
1043 
1044     nxt_thread_mutex_unlock(&port->write_mutex);
1045 
1046     if (use_delta != 0) {
1047         nxt_port_use(task, port, use_delta);
1048     }
1049 }
1050