xref: /unit/src/nxt_port_socket.c (revision 1269:41331471eee7)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
11     nxt_port_send_msg_t *msg);
12 static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
13 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
14 static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
15 static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
16     nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
17 static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
18     nxt_port_send_msg_t *msg);
19 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
20 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
21     nxt_port_recv_msg_t *msg);
22 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
23 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
24 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
25 
26 
27 nxt_int_t
28 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
29 {
30     nxt_int_t     sndbuf, rcvbuf, size;
31     nxt_socket_t  snd, rcv;
32 
33     port->socket.task = task;
34 
35     port->pair[0] = -1;
36     port->pair[1] = -1;
37 
38     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
39         goto socketpair_fail;
40     }
41 
42     snd = port->pair[1];
43 
44     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
45     if (nxt_slow_path(sndbuf < 0)) {
46         goto getsockopt_fail;
47     }
48 
49     rcv = port->pair[0];
50 
51     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
52     if (nxt_slow_path(rcvbuf < 0)) {
53         goto getsockopt_fail;
54     }
55 
56     if (max_size == 0) {
57         max_size = 16 * 1024;
58     }
59 
60     if ((size_t) sndbuf < max_size) {
61         /*
62          * On Unix domain sockets
63          *   Linux uses 224K on both send and receive directions;
64          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
65          *   on send direction and 4K buffer size on receive direction;
66          *   Solaris uses 16K on send direction and 5K on receive direction.
67          */
68         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
69                                      max_size);
70 
71         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
72         if (nxt_slow_path(sndbuf < 0)) {
73             goto getsockopt_fail;
74         }
75 
76         size = sndbuf * 4;
77 
78         if (rcvbuf < size) {
79             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
80                                          size);
81 
82             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
83             if (nxt_slow_path(rcvbuf < 0)) {
84                 goto getsockopt_fail;
85             }
86         }
87     }
88 
89     port->max_size = nxt_min(max_size, (size_t) sndbuf);
90     port->max_share = (64 * 1024);
91 
92     return NXT_OK;
93 
94 getsockopt_fail:
95 
96     nxt_socket_close(task, port->pair[0]);
97     nxt_socket_close(task, port->pair[1]);
98 
99 socketpair_fail:
100 
101     return NXT_ERROR;
102 }
103 
104 
105 void
106 nxt_port_destroy(nxt_port_t *port)
107 {
108     nxt_socket_close(port->socket.task, port->socket.fd);
109     nxt_mp_destroy(port->mem_pool);
110 }
111 
112 
113 void
114 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
115 {
116     port->socket.fd = port->pair[1];
117     port->socket.log = &nxt_main_log;
118     port->socket.write_ready = 1;
119 
120     port->engine = task->thread->engine;
121 
122     port->socket.write_work_queue = &port->engine->fast_work_queue;
123     port->socket.write_handler = nxt_port_write_handler;
124     port->socket.error_handler = nxt_port_error_handler;
125 }
126 
127 
128 void
129 nxt_port_write_close(nxt_port_t *port)
130 {
131     nxt_socket_close(port->socket.task, port->pair[1]);
132     port->pair[1] = -1;
133 }
134 
135 
136 static void
137 nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
138 {
139     if (msg->allocated) {
140         nxt_free(msg);
141     }
142 }
143 
144 
145 nxt_int_t
146 nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
147     nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
148     void *tracking)
149 {
150     nxt_int_t            res;
151     nxt_port_send_msg_t  msg;
152 
153     msg.link.next = NULL;
154     msg.link.prev = NULL;
155 
156     msg.buf = b;
157     msg.share = 0;
158     msg.fd = fd;
159     msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
160     msg.allocated = 0;
161 
162     if (tracking != NULL) {
163         nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
164     }
165 
166     msg.port_msg.stream = stream;
167     msg.port_msg.pid = nxt_pid;
168     msg.port_msg.reply_port = reply_port;
169     msg.port_msg.type = type & NXT_PORT_MSG_MASK;
170     msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
171     msg.port_msg.mmap = 0;
172     msg.port_msg.nf = 0;
173     msg.port_msg.mf = 0;
174     msg.port_msg.tracking = tracking != NULL;
175 
176     res = nxt_port_msg_chk_insert(task, port, &msg);
177     if (nxt_fast_path(res == NXT_DECLINED)) {
178         nxt_port_write_handler(task, &port->socket, &msg);
179         res = NXT_OK;
180     }
181 
182     return res;
183 }
184 
185 
186 static nxt_int_t
187 nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
188     nxt_port_send_msg_t *msg)
189 {
190     nxt_int_t  res;
191 
192     nxt_thread_mutex_lock(&port->write_mutex);
193 
194     if (nxt_fast_path(port->socket.write_ready
195                       && nxt_queue_is_empty(&port->messages)))
196     {
197         res = NXT_DECLINED;
198 
199     } else {
200         msg = nxt_port_msg_alloc(msg);
201 
202         if (nxt_fast_path(msg != NULL)) {
203             nxt_queue_insert_tail(&port->messages, &msg->link);
204             nxt_port_use(task, port, 1);
205             res = NXT_OK;
206 
207         } else {
208             res = NXT_ERROR;
209         }
210     }
211 
212     nxt_thread_mutex_unlock(&port->write_mutex);
213 
214     return res;
215 }
216 
217 
218 static nxt_port_send_msg_t *
219 nxt_port_msg_alloc(nxt_port_send_msg_t *m)
220 {
221     nxt_port_send_msg_t  *msg;
222 
223     msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
224     if (nxt_slow_path(msg == NULL)) {
225         return NULL;
226     }
227 
228     *msg = *m;
229 
230     msg->allocated = 1;
231 
232     return msg;
233 }
234 
235 
236 static void
237 nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
238 {
239     nxt_fd_event_block_write(task->thread->engine, &port->socket);
240 }
241 
242 
243 static void
244 nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
245 {
246     nxt_fd_event_enable_write(task->thread->engine, &port->socket);
247 }
248 
249 
250 static void
251 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
252 {
253     int                     use_delta;
254     size_t                  plain_size;
255     ssize_t                 n;
256     uint32_t                mmsg_buf[3 * NXT_IOBUF_MAX * 10];
257     nxt_bool_t              block_write, enable_write;
258     nxt_port_t              *port;
259     struct iovec            iov[NXT_IOBUF_MAX * 10];
260     nxt_work_queue_t        *wq;
261     nxt_port_method_t       m;
262     nxt_port_send_msg_t     *msg;
263     nxt_sendbuf_coalesce_t  sb;
264 
265     port = nxt_container_of(obj, nxt_port_t, socket);
266 
267     block_write = 0;
268     enable_write = 0;
269     use_delta = 0;
270 
271     wq = &task->thread->engine->fast_work_queue;
272 
273     do {
274         if (data) {
275             msg = data;
276 
277         } else {
278             msg = nxt_port_msg_first(port);
279 
280             if (msg == NULL) {
281                 block_write = 1;
282                 goto cleanup;
283             }
284         }
285 
286 next_fragment:
287 
288         iov[0].iov_base = &msg->port_msg;
289         iov[0].iov_len = sizeof(nxt_port_msg_t);
290 
291         sb.buf = msg->buf;
292         sb.iobuf = &iov[1];
293         sb.nmax = NXT_IOBUF_MAX - 1;
294         sb.sync = 0;
295         sb.last = 0;
296         sb.size = 0;
297         sb.limit = port->max_size;
298 
299         sb.limit_reached = 0;
300         sb.nmax_reached = 0;
301 
302         m = nxt_port_mmap_get_method(task, port, msg->buf);
303 
304         if (m == NXT_PORT_METHOD_MMAP) {
305             sb.limit = (1ULL << 31) - 1;
306             sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
307                               port->max_size / PORT_MMAP_MIN_SIZE);
308         }
309 
310         if (msg->port_msg.tracking) {
311             iov[0].iov_len += sizeof(msg->tracking_msg);
312         }
313 
314         sb.limit -= iov[0].iov_len;
315 
316         nxt_sendbuf_mem_coalesce(task, &sb);
317 
318         plain_size = sb.size;
319 
320         /*
321          * Send through mmap enabled only when payload
322          * is bigger than PORT_MMAP_MIN_SIZE.
323          */
324         if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
325             nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
326 
327         } else {
328             m = NXT_PORT_METHOD_PLAIN;
329         }
330 
331         msg->port_msg.last |= sb.last;
332         msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
333 
334         n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
335 
336         if (n > 0) {
337             if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
338                 nxt_alert(task, "port %d: short write: %z instead of %uz",
339                           port->socket.fd, n, sb.size + iov[0].iov_len);
340                 goto fail;
341             }
342 
343             if (msg->fd != -1 && msg->close_fd != 0) {
344                 nxt_fd_close(msg->fd);
345 
346                 msg->fd = -1;
347             }
348 
349             msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
350                                                m == NXT_PORT_METHOD_MMAP);
351 
352             if (msg->buf != NULL) {
353                 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
354                           msg->port_msg.stream);
355 
356                 /*
357                  * A file descriptor is sent only
358                  * in the first message of a stream.
359                  */
360                 msg->fd = -1;
361                 msg->share += n;
362                 msg->port_msg.nf = 1;
363                 msg->port_msg.tracking = 0;
364 
365                 if (msg->share >= port->max_share) {
366                     msg->share = 0;
367 
368                     if (msg->link.next != NULL) {
369                         nxt_thread_mutex_lock(&port->write_mutex);
370 
371                         nxt_queue_remove(&msg->link);
372                         nxt_queue_insert_tail(&port->messages, &msg->link);
373 
374                         nxt_thread_mutex_unlock(&port->write_mutex);
375 
376                     } else {
377                         msg = nxt_port_msg_insert_tail(port, msg);
378                         if (nxt_slow_path(msg == NULL)) {
379                             goto fail;
380                         }
381 
382                         use_delta++;
383                     }
384 
385                 } else {
386                     goto next_fragment;
387                 }
388 
389             } else {
390                 if (msg->link.next != NULL) {
391                     nxt_thread_mutex_lock(&port->write_mutex);
392 
393                     nxt_queue_remove(&msg->link);
394                     msg->link.next = NULL;
395 
396                     nxt_thread_mutex_unlock(&port->write_mutex);
397 
398                     use_delta--;
399                 }
400 
401                 nxt_port_release_send_msg(msg);
402             }
403 
404             if (data != NULL) {
405                 goto cleanup;
406             }
407 
408         } else {
409             if (nxt_slow_path(n == NXT_ERROR)) {
410                 goto fail;
411             }
412 
413             if (msg->link.next == NULL) {
414                 msg = nxt_port_msg_insert_tail(port, msg);
415                 if (nxt_slow_path(msg == NULL)) {
416                     goto fail;
417                 }
418 
419                 use_delta++;
420             }
421         }
422 
423     } while (port->socket.write_ready);
424 
425     if (nxt_fd_event_is_disabled(port->socket.write)) {
426         enable_write = 1;
427     }
428 
429     goto cleanup;
430 
431 fail:
432 
433     use_delta++;
434 
435     nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
436                        &port->socket);
437 
438 cleanup:
439 
440     if (block_write && nxt_fd_event_is_active(port->socket.write)) {
441         nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
442     }
443 
444     if (enable_write) {
445         nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
446     }
447 
448     if (use_delta != 0) {
449         nxt_port_use(task, port, use_delta);
450     }
451 }
452 
453 
454 static nxt_port_send_msg_t *
455 nxt_port_msg_first(nxt_port_t *port)
456 {
457     nxt_queue_link_t     *lnk;
458     nxt_port_send_msg_t  *msg;
459 
460     nxt_thread_mutex_lock(&port->write_mutex);
461 
462     lnk = nxt_queue_first(&port->messages);
463 
464     if (lnk == nxt_queue_tail(&port->messages)) {
465         msg = NULL;
466 
467     } else {
468         msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
469     }
470 
471     nxt_thread_mutex_unlock(&port->write_mutex);
472 
473     return msg;
474 }
475 
476 
477 static nxt_buf_t *
478 nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
479     size_t sent, nxt_bool_t mmap_mode)
480 {
481     size_t     size;
482     nxt_buf_t  *next;
483 
484     while (b != NULL) {
485 
486         nxt_prefetch(b->next);
487 
488         if (!nxt_buf_is_sync(b)) {
489 
490             size = nxt_buf_used_size(b);
491 
492             if (size != 0) {
493 
494                 if (sent == 0) {
495                     break;
496                 }
497 
498                 if (nxt_buf_is_port_mmap(b) && mmap_mode) {
499                     /*
500                      * buffer has been sent to other side which is now
501                      * responsible for shared memory bucket release
502                      */
503                     b->is_port_mmap_sent = 1;
504                 }
505 
506                 if (sent < size) {
507 
508                     if (nxt_buf_is_mem(b)) {
509                         b->mem.pos += sent;
510                     }
511 
512                     if (nxt_buf_is_file(b)) {
513                         b->file_pos += sent;
514                     }
515 
516                     break;
517                 }
518 
519                 /* b->mem.free is NULL in file-only buffer. */
520                 b->mem.pos = b->mem.free;
521 
522                 if (nxt_buf_is_file(b)) {
523                     b->file_pos = b->file_end;
524                 }
525 
526                 sent -= size;
527             }
528         }
529 
530         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
531 
532         next = b->next;
533         b->next = NULL;
534         b = next;
535     }
536 
537     return b;
538 }
539 
540 
541 static nxt_port_send_msg_t *
542 nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
543 {
544     if (msg->allocated == 0) {
545         msg = nxt_port_msg_alloc(msg);
546 
547         if (nxt_slow_path(msg == NULL)) {
548             return NULL;
549         }
550     }
551 
552     nxt_thread_mutex_lock(&port->write_mutex);
553 
554     nxt_queue_insert_tail(&port->messages, &msg->link);
555 
556     nxt_thread_mutex_unlock(&port->write_mutex);
557 
558     return msg;
559 }
560 
561 
562 void
563 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
564 {
565     port->socket.fd = port->pair[0];
566     port->socket.log = &nxt_main_log;
567 
568     port->engine = task->thread->engine;
569 
570     port->socket.read_work_queue = &port->engine->fast_work_queue;
571     port->socket.read_handler = nxt_port_read_handler;
572     port->socket.error_handler = nxt_port_error_handler;
573 
574     nxt_fd_event_enable_read(port->engine, &port->socket);
575 }
576 
577 
578 void
579 nxt_port_read_close(nxt_port_t *port)
580 {
581     port->socket.read_ready = 0;
582     port->socket.read = NXT_EVENT_INACTIVE;
583     nxt_socket_close(port->socket.task, port->pair[0]);
584     port->pair[0] = -1;
585 }
586 
587 
588 static void
589 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
590 {
591     ssize_t              n;
592     nxt_buf_t            *b;
593     nxt_port_t           *port;
594     struct iovec         iov[2];
595     nxt_port_recv_msg_t  msg;
596 
597     port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
598 
599     nxt_assert(port->engine == task->thread->engine);
600 
601     for ( ;; ) {
602 
603         b = nxt_port_buf_alloc(port);
604 
605         if (nxt_slow_path(b == NULL)) {
606             /* TODO: disable event for some time */
607         }
608 
609         iov[0].iov_base = &msg.port_msg;
610         iov[0].iov_len = sizeof(nxt_port_msg_t);
611 
612         iov[1].iov_base = b->mem.pos;
613         iov[1].iov_len = port->max_size;
614 
615         n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
616 
617         if (n > 0) {
618 
619             msg.buf = b;
620             msg.size = n;
621 
622             nxt_port_read_msg_process(task, port, &msg);
623 
624             /*
625              * To disable instant completion or buffer re-usage,
626              * handler should reset 'msg.buf'.
627              */
628             if (msg.buf == b) {
629                 nxt_port_buf_free(port, b);
630             }
631 
632             if (port->socket.read_ready) {
633                 continue;
634             }
635 
636             return;
637         }
638 
639         if (n == NXT_AGAIN) {
640             nxt_port_buf_free(port, b);
641 
642             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
643             return;
644         }
645 
646         /* n == 0 || n == NXT_ERROR */
647 
648         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
649                            nxt_port_error_handler, task, &port->socket, NULL);
650         return;
651     }
652 }
653 
654 
655 typedef struct {
656     uint32_t  stream;
657     uint32_t  pid;
658 } nxt_port_frag_key_t;
659 
660 
661 static nxt_int_t
662 nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
663 {
664     nxt_port_recv_msg_t  *fmsg;
665     nxt_port_frag_key_t  *frag_key;
666 
667     fmsg = data;
668     frag_key = (nxt_port_frag_key_t *) lhq->key.start;
669 
670     if (lhq->key.length == sizeof(nxt_port_frag_key_t)
671         && frag_key->stream == fmsg->port_msg.stream
672         && frag_key->pid == (uint32_t) fmsg->port_msg.pid)
673     {
674         return NXT_OK;
675     }
676 
677     return NXT_DECLINED;
678 }
679 
680 
681 static void *
682 nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
683 {
684     return nxt_mp_align(ctx, size, size);
685 }
686 
687 
688 static void
689 nxt_port_lvlhsh_frag_free(void *ctx, void *p)
690 {
691     nxt_mp_free(ctx, p);
692 }
693 
694 
695 static const nxt_lvlhsh_proto_t  lvlhsh_frag_proto  nxt_aligned(64) = {
696     NXT_LVLHSH_DEFAULT,
697     nxt_port_lvlhsh_frag_test,
698     nxt_port_lvlhsh_frag_alloc,
699     nxt_port_lvlhsh_frag_free,
700 };
701 
702 
703 static nxt_port_recv_msg_t *
704 nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
705     nxt_port_recv_msg_t *msg)
706 {
707     nxt_int_t            res;
708     nxt_lvlhsh_query_t   lhq;
709     nxt_port_recv_msg_t  *fmsg;
710     nxt_port_frag_key_t  frag_key;
711 
712     nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
713 
714     fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
715 
716     if (nxt_slow_path(fmsg == NULL)) {
717         return NULL;
718     }
719 
720     *fmsg = *msg;
721 
722     frag_key.stream = fmsg->port_msg.stream;
723     frag_key.pid = fmsg->port_msg.pid;
724 
725     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
726     lhq.key.length = sizeof(nxt_port_frag_key_t);
727     lhq.key.start = (u_char *) &frag_key;
728     lhq.proto = &lvlhsh_frag_proto;
729     lhq.replace = 0;
730     lhq.value = fmsg;
731     lhq.pool = port->mem_pool;
732 
733     res = nxt_lvlhsh_insert(&port->frags, &lhq);
734 
735     switch (res) {
736 
737     case NXT_OK:
738         return fmsg;
739 
740     case NXT_DECLINED:
741         nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
742                 fmsg->port_msg.stream);
743         nxt_mp_free(port->mem_pool, fmsg);
744 
745         return NULL;
746 
747     default:
748         nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
749                 fmsg->port_msg.stream);
750 
751         nxt_mp_free(port->mem_pool, fmsg);
752 
753         return NULL;
754 
755     }
756 }
757 
758 
759 static nxt_port_recv_msg_t *
760 nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
761 {
762     nxt_int_t            res;
763     nxt_bool_t           last;
764     nxt_lvlhsh_query_t   lhq;
765     nxt_port_frag_key_t  frag_key;
766 
767     last = msg->port_msg.mf == 0;
768 
769     nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
770               msg->port_msg.stream);
771 
772     frag_key.stream = msg->port_msg.stream;
773     frag_key.pid = msg->port_msg.pid;
774 
775     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
776     lhq.key.length = sizeof(nxt_port_frag_key_t);
777     lhq.key.start = (u_char *) &frag_key;
778     lhq.proto = &lvlhsh_frag_proto;
779     lhq.pool = port->mem_pool;
780 
781     res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
782           nxt_lvlhsh_find(&port->frags, &lhq);
783 
784     switch (res) {
785 
786     case NXT_OK:
787         return lhq.value;
788 
789     default:
790         nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
791                 frag_key.stream);
792 
793         return NULL;
794     }
795 }
796 
797 
798 static void
799 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
800     nxt_port_recv_msg_t *msg)
801 {
802     nxt_buf_t            *b, *orig_b, *next;
803     nxt_port_recv_msg_t  *fmsg;
804 
805     if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
806         nxt_alert(task, "port %d: too small message:%uz",
807                   port->socket.fd, msg->size);
808 
809         if (msg->fd != -1) {
810             nxt_fd_close(msg->fd);
811         }
812 
813         return;
814     }
815 
816     /* adjust size to actual buffer used size */
817     msg->size -= sizeof(nxt_port_msg_t);
818 
819     b = orig_b = msg->buf;
820     b->mem.free += msg->size;
821 
822     if (msg->port_msg.tracking) {
823         msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
824 
825     } else {
826         msg->cancelled = 0;
827     }
828 
829     if (nxt_slow_path(msg->port_msg.nf != 0)) {
830 
831         fmsg = nxt_port_frag_find(task, port, msg);
832 
833         if (nxt_slow_path(fmsg == NULL)) {
834             goto fmsg_failed;
835         }
836 
837         if (nxt_fast_path(fmsg->cancelled == 0)) {
838 
839             if (msg->port_msg.mmap) {
840                 nxt_port_mmap_read(task, msg);
841             }
842 
843             nxt_buf_chain_add(&fmsg->buf, msg->buf);
844 
845             fmsg->size += msg->size;
846             msg->buf = NULL;
847             b = NULL;
848 
849             if (nxt_fast_path(msg->port_msg.mf == 0)) {
850 
851                 b = fmsg->buf;
852 
853                 port->handler(task, fmsg);
854 
855                 msg->buf = fmsg->buf;
856                 msg->fd = fmsg->fd;
857 
858                 /*
859                  * To disable instant completion or buffer re-usage,
860                  * handler should reset 'msg.buf'.
861                  */
862                 if (!msg->port_msg.mmap && msg->buf == b) {
863                     nxt_port_buf_free(port, b);
864                 }
865             }
866         }
867 
868         if (nxt_fast_path(msg->port_msg.mf == 0)) {
869             nxt_mp_free(port->mem_pool, fmsg);
870         }
871     } else {
872         if (nxt_slow_path(msg->port_msg.mf != 0)) {
873 
874             if (msg->port_msg.mmap && msg->cancelled == 0) {
875                 nxt_port_mmap_read(task, msg);
876                 b = msg->buf;
877             }
878 
879             fmsg = nxt_port_frag_start(task, port, msg);
880 
881             if (nxt_slow_path(fmsg == NULL)) {
882                 goto fmsg_failed;
883             }
884 
885             fmsg->port_msg.nf = 0;
886             fmsg->port_msg.mf = 0;
887 
888             if (nxt_fast_path(msg->cancelled == 0)) {
889                 msg->buf = NULL;
890                 msg->fd = -1;
891                 b = NULL;
892 
893             } else {
894                 if (msg->fd != -1) {
895                     nxt_fd_close(msg->fd);
896                 }
897             }
898         } else {
899             if (nxt_fast_path(msg->cancelled == 0)) {
900 
901                 if (msg->port_msg.mmap) {
902                     nxt_port_mmap_read(task, msg);
903                     b = msg->buf;
904                 }
905 
906                 port->handler(task, msg);
907             }
908         }
909     }
910 
911 fmsg_failed:
912 
913     if (msg->port_msg.mmap && orig_b != b) {
914 
915         /*
916          * To disable instant buffer completion,
917          * handler should reset 'msg->buf'.
918          */
919         if (msg->buf == b) {
920             /* complete mmap buffers */
921             while (b != NULL) {
922                 nxt_debug(task, "complete buffer %p", b);
923 
924                 nxt_work_queue_add(port->socket.read_work_queue,
925                     b->completion_handler, task, b, b->parent);
926 
927                 next = b->next;
928                 b->next = NULL;
929                 b = next;
930             }
931         }
932 
933         /* restore original buf */
934         msg->buf = orig_b;
935     }
936 }
937 
938 
939 static nxt_buf_t *
940 nxt_port_buf_alloc(nxt_port_t *port)
941 {
942     nxt_buf_t  *b;
943 
944     if (port->free_bufs != NULL) {
945         b = port->free_bufs;
946         port->free_bufs = b->next;
947 
948         b->mem.pos = b->mem.start;
949         b->mem.free = b->mem.start;
950         b->next = NULL;
951     } else {
952         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
953         if (nxt_slow_path(b == NULL)) {
954             return NULL;
955         }
956     }
957 
958     return b;
959 }
960 
961 
962 static void
963 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
964 {
965     nxt_buf_chain_add(&b, port->free_bufs);
966     port->free_bufs = b;
967 }
968 
969 
970 static void
971 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
972 {
973     int                  use_delta;
974     nxt_buf_t            *b, *next;
975     nxt_port_t           *port;
976     nxt_work_queue_t     *wq;
977     nxt_port_send_msg_t  *msg;
978 
979     nxt_debug(task, "port error handler %p", obj);
980     /* TODO */
981 
982     port = nxt_container_of(obj, nxt_port_t, socket);
983 
984     use_delta = 0;
985 
986     if (obj == data) {
987         use_delta--;
988     }
989 
990     wq = &task->thread->engine->fast_work_queue;
991 
992     nxt_thread_mutex_lock(&port->write_mutex);
993 
994     nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
995 
996         for (b = msg->buf; b != NULL; b = next) {
997             next = b->next;
998             b->next = NULL;
999 
1000             if (nxt_buf_is_sync(b)) {
1001                 continue;
1002             }
1003 
1004             nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
1005         }
1006 
1007         nxt_queue_remove(&msg->link);
1008         use_delta--;
1009 
1010         nxt_port_release_send_msg(msg);
1011 
1012     } nxt_queue_loop;
1013 
1014     nxt_thread_mutex_unlock(&port->write_mutex);
1015 
1016     if (use_delta != 0) {
1017         nxt_port_use(task, port, use_delta);
1018     }
1019 }
1020