xref: /unit/src/nxt_port_socket.c (revision 551:220b0834790b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
11 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
12 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
13     nxt_port_recv_msg_t *msg);
14 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
15 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
16 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
17 
18 
19 nxt_int_t
20 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
21 {
22     nxt_int_t     sndbuf, rcvbuf, size;
23     nxt_socket_t  snd, rcv;
24 
25     port->socket.task = task;
26 
27     port->pair[0] = -1;
28     port->pair[1] = -1;
29 
30     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
31         goto socketpair_fail;
32     }
33 
34     snd = port->pair[1];
35 
36     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
37     if (nxt_slow_path(sndbuf < 0)) {
38         goto getsockopt_fail;
39     }
40 
41     rcv = port->pair[0];
42 
43     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
44     if (nxt_slow_path(rcvbuf < 0)) {
45         goto getsockopt_fail;
46     }
47 
48     if (max_size == 0) {
49         max_size = 16 * 1024;
50     }
51 
52     if ((size_t) sndbuf < max_size) {
53         /*
54          * On Unix domain sockets
55          *   Linux uses 224K on both send and receive directions;
56          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
57          *   on send direction and 4K buffer size on receive direction;
58          *   Solaris uses 16K on send direction and 5K on receive direction.
59          */
60         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
61                                      max_size);
62 
63         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
64         if (nxt_slow_path(sndbuf < 0)) {
65             goto getsockopt_fail;
66         }
67 
68         size = sndbuf * 4;
69 
70         if (rcvbuf < size) {
71             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
72                                          size);
73 
74             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
75             if (nxt_slow_path(rcvbuf < 0)) {
76                 goto getsockopt_fail;
77             }
78         }
79     }
80 
81     port->max_size = nxt_min(max_size, (size_t) sndbuf);
82     port->max_share = (64 * 1024);
83 
84     return NXT_OK;
85 
86 getsockopt_fail:
87 
88     nxt_socket_close(task, port->pair[0]);
89     nxt_socket_close(task, port->pair[1]);
90 
91 socketpair_fail:
92 
93     return NXT_ERROR;
94 }
95 
96 
97 void
98 nxt_port_destroy(nxt_port_t *port)
99 {
100     nxt_socket_close(port->socket.task, port->socket.fd);
101     nxt_mp_destroy(port->mem_pool);
102 }
103 
104 
105 void
106 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
107 {
108     port->socket.fd = port->pair[1];
109     port->socket.log = &nxt_main_log;
110     port->socket.write_ready = 1;
111 
112     port->engine = task->thread->engine;
113 
114     port->socket.write_work_queue = &port->engine->fast_work_queue;
115     port->socket.write_handler = nxt_port_write_handler;
116     port->socket.error_handler = nxt_port_error_handler;
117 
118     if (port->iov == NULL) {
119         port->iov = nxt_mp_get(port->mem_pool, sizeof(struct iovec) *
120                                NXT_IOBUF_MAX * 10);
121         port->mmsg_buf = nxt_mp_get(port->mem_pool, sizeof(uint32_t) * 3 *
122                                     NXT_IOBUF_MAX * 10);
123     }
124 }
125 
126 
127 void
128 nxt_port_write_close(nxt_port_t *port)
129 {
130     nxt_socket_close(port->socket.task, port->pair[1]);
131     port->pair[1] = -1;
132 }
133 
134 
135 static void
136 nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
137 {
138     nxt_event_engine_t   *engine;
139     nxt_port_send_msg_t  *msg;
140 
141     msg = obj;
142     engine = data;
143 
144 #if (NXT_DEBUG)
145     if (nxt_slow_path(data != msg->work.data)) {
146         nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)",
147                       data, msg->work.data);
148         nxt_abort();
149     }
150 #endif
151 
152     if (engine != task->thread->engine) {
153 
154         nxt_debug(task, "current thread is %PT, expected %PT",
155                   task->thread->tid, engine->task.thread->tid);
156 
157         nxt_event_engine_post(engine, &msg->work);
158 
159         return;
160     }
161 
162     nxt_mp_free(engine->mem_pool, obj);
163     nxt_mp_release(engine->mem_pool);
164 }
165 
166 
167 static nxt_port_send_msg_t *
168 nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m)
169 {
170     nxt_mp_t             *mp;
171     nxt_port_send_msg_t  *msg;
172 
173     mp = task->thread->engine->mem_pool;
174 
175     msg = nxt_mp_alloc(mp, sizeof(nxt_port_send_msg_t));
176     if (nxt_slow_path(msg == NULL)) {
177         return NULL;
178     }
179 
180     nxt_mp_retain(mp);
181 
182     msg->link.next = NULL;
183     msg->link.prev = NULL;
184 
185     msg->buf = m->buf;
186     msg->fd = m->fd;
187     msg->close_fd = m->close_fd;
188     msg->port_msg = m->port_msg;
189 
190     msg->work.next = NULL;
191     msg->work.handler = nxt_port_release_send_msg;
192     msg->work.task = task;
193     msg->work.obj = msg;
194     msg->work.data = task->thread->engine;
195 
196     return msg;
197 }
198 
199 
200 static nxt_port_send_msg_t *
201 nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
202 {
203     if (msg->work.data == NULL) {
204         msg = nxt_port_msg_create(task, msg);
205     }
206 
207     if (msg != NULL) {
208         nxt_queue_insert_tail(&port->messages, &msg->link);
209     }
210 
211     return msg;
212 }
213 
214 
215 static nxt_port_send_msg_t *
216 nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
217 {
218     nxt_queue_link_t  *lnk;
219 
220     lnk = nxt_queue_first(&port->messages);
221 
222     if (lnk == nxt_queue_tail(&port->messages)) {
223         return msg;
224     }
225 
226     return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
227 }
228 
229 
230 nxt_int_t
231 nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
232     nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
233     void *tracking)
234 {
235     nxt_port_send_msg_t  msg, *res;
236 
237     msg.link.next = NULL;
238     msg.link.prev = NULL;
239 
240     msg.buf = b;
241     msg.fd = fd;
242     msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
243     msg.share = 0;
244 
245     if (tracking != NULL) {
246         nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
247     }
248 
249     msg.port_msg.stream = stream;
250     msg.port_msg.pid = nxt_pid;
251     msg.port_msg.reply_port = reply_port;
252     msg.port_msg.type = type & NXT_PORT_MSG_MASK;
253     msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
254     msg.port_msg.mmap = 0;
255     msg.port_msg.nf = 0;
256     msg.port_msg.mf = 0;
257     msg.port_msg.tracking = tracking != NULL;
258 
259     msg.work.data = NULL;
260 
261     if (port->socket.write_ready) {
262         nxt_port_write_handler(task, &port->socket, &msg);
263     } else {
264         nxt_thread_mutex_lock(&port->write_mutex);
265 
266         res = nxt_port_msg_push(task, port, &msg);
267 
268         nxt_thread_mutex_unlock(&port->write_mutex);
269 
270         if (res == NULL) {
271             return NXT_ERROR;
272         }
273 
274         nxt_port_use(task, port, 1);
275     }
276 
277     return NXT_OK;
278 }
279 
280 
281 static void
282 nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
283 {
284     nxt_fd_event_block_write(task->thread->engine, &port->socket);
285 }
286 
287 
288 static void
289 nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
290 {
291     nxt_fd_event_enable_write(task->thread->engine, &port->socket);
292 }
293 
294 
295 static void
296 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
297 {
298     int                     use_delta;
299     size_t                  plain_size;
300     ssize_t                 n;
301     nxt_bool_t              block_write, enable_write;
302     nxt_port_t              *port;
303     struct iovec            *iov;
304     nxt_work_queue_t        *wq;
305     nxt_port_method_t       m;
306     nxt_port_send_msg_t     *msg;
307     nxt_sendbuf_coalesce_t  sb;
308 
309     port = nxt_container_of(obj, nxt_port_t, socket);
310 
311     block_write = 0;
312     enable_write = 0;
313     use_delta = 0;
314 
315     nxt_thread_mutex_lock(&port->write_mutex);
316 
317     iov = port->iov;
318 
319     wq = &task->thread->engine->fast_work_queue;
320 
321     do {
322         msg = nxt_port_msg_first(task, port, data);
323 
324         if (msg == NULL) {
325             block_write = 1;
326             goto unlock_mutex;
327         }
328 
329         iov[0].iov_base = &msg->port_msg;
330         iov[0].iov_len = sizeof(nxt_port_msg_t);
331 
332         sb.buf = msg->buf;
333         sb.iobuf = &iov[1];
334         sb.nmax = NXT_IOBUF_MAX - 1;
335         sb.sync = 0;
336         sb.last = 0;
337         sb.size = 0;
338         sb.limit = port->max_size;
339 
340         sb.limit_reached = 0;
341         sb.nmax_reached = 0;
342 
343         m = nxt_port_mmap_get_method(task, port, msg->buf);
344 
345         if (m == NXT_PORT_METHOD_MMAP) {
346             sb.limit = (1ULL << 31) - 1;
347             sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
348                               port->max_size / PORT_MMAP_MIN_SIZE);
349         }
350 
351         if (msg->port_msg.tracking) {
352             iov[0].iov_len += sizeof(msg->tracking_msg);
353         }
354 
355         nxt_sendbuf_mem_coalesce(task, &sb);
356 
357         plain_size = sb.size;
358 
359         /*
360          * Send through mmap enabled only when payload
361          * is bigger than PORT_MMAP_MIN_SIZE.
362          */
363         if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
364             nxt_port_mmap_write(task, port, msg, &sb);
365 
366         } else {
367             m = NXT_PORT_METHOD_PLAIN;
368         }
369 
370         msg->port_msg.last |= sb.last;
371         msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
372 
373         n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
374 
375         if (n > 0) {
376             if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
377                 nxt_log(task, NXT_LOG_CRIT,
378                         "port %d: short write: %z instead of %uz",
379                         port->socket.fd, n, sb.size + iov[0].iov_len);
380                 goto fail;
381             }
382 
383             if (msg->fd != -1 && msg->close_fd != 0) {
384                 nxt_fd_close(msg->fd);
385 
386                 msg->fd = -1;
387             }
388 
389             msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size,
390                                               m == NXT_PORT_METHOD_MMAP);
391 
392             if (msg->buf != NULL) {
393                 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
394                           msg->port_msg.stream);
395 
396                 /*
397                  * A file descriptor is sent only
398                  * in the first message of a stream.
399                  */
400                 msg->fd = -1;
401                 msg->share += n;
402                 msg->port_msg.nf = 1;
403                 msg->port_msg.tracking = 0;
404 
405                 if (msg->share >= port->max_share) {
406                     msg->share = 0;
407 
408                     if (msg->link.next != NULL) {
409                         nxt_queue_remove(&msg->link);
410                         use_delta--;
411                     }
412                     data = NULL;
413 
414                     if (nxt_port_msg_push(task, port, msg) != NULL) {
415                         use_delta++;
416                     }
417                 }
418 
419             } else {
420                 if (msg->link.next != NULL) {
421                     nxt_queue_remove(&msg->link);
422                     use_delta--;
423                     nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
424                                        msg->work.data);
425                 }
426                 data = NULL;
427             }
428 
429         } else if (nxt_slow_path(n == NXT_ERROR)) {
430             if (msg->link.next == NULL) {
431                 if (nxt_port_msg_push(task, port, msg) != NULL) {
432                     use_delta++;
433                 }
434             }
435             goto fail;
436         }
437 
438         /* n == NXT_AGAIN */
439 
440     } while (port->socket.write_ready);
441 
442     if (nxt_fd_event_is_disabled(port->socket.write)) {
443         enable_write = 1;
444     }
445 
446     goto unlock_mutex;
447 
448 fail:
449 
450     use_delta++;
451 
452     nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
453                        &port->socket);
454 
455 unlock_mutex:
456     nxt_thread_mutex_unlock(&port->write_mutex);
457 
458     if (block_write && nxt_fd_event_is_active(port->socket.write)) {
459         nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
460     }
461 
462     if (enable_write) {
463         nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
464     }
465 
466     if (use_delta != 0) {
467         nxt_port_use(task, port, use_delta);
468     }
469 }
470 
471 
472 void
473 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
474 {
475     port->socket.fd = port->pair[0];
476     port->socket.log = &nxt_main_log;
477 
478     port->engine = task->thread->engine;
479 
480     port->socket.read_work_queue = &port->engine->fast_work_queue;
481     port->socket.read_handler = nxt_port_read_handler;
482     port->socket.error_handler = nxt_port_error_handler;
483 
484     nxt_fd_event_enable_read(port->engine, &port->socket);
485 }
486 
487 
488 void
489 nxt_port_read_close(nxt_port_t *port)
490 {
491     port->socket.read_ready = 0;
492     nxt_socket_close(port->socket.task, port->pair[0]);
493     port->pair[0] = -1;
494 }
495 
496 
497 static void
498 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
499 {
500     ssize_t              n;
501     nxt_buf_t            *b;
502     nxt_port_t           *port;
503     struct iovec         iov[2];
504     nxt_port_recv_msg_t  msg;
505 
506     port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
507 
508     nxt_assert(port->engine == task->thread->engine);
509 
510     for ( ;; ) {
511 
512         b = nxt_port_buf_alloc(port);
513 
514         if (nxt_slow_path(b == NULL)) {
515             /* TODO: disable event for some time */
516         }
517 
518         iov[0].iov_base = &msg.port_msg;
519         iov[0].iov_len = sizeof(nxt_port_msg_t);
520 
521         iov[1].iov_base = b->mem.pos;
522         iov[1].iov_len = port->max_size;
523 
524         n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
525 
526         if (n > 0) {
527 
528             msg.buf = b;
529             msg.size = n;
530 
531             nxt_port_read_msg_process(task, port, &msg);
532 
533             /*
534              * To disable instant completion or buffer re-usage,
535              * handler should reset 'msg.buf'.
536              */
537             if (msg.buf == b) {
538                 nxt_port_buf_free(port, b);
539             }
540 
541             if (port->socket.read_ready) {
542                 continue;
543             }
544 
545             return;
546         }
547 
548         if (n == NXT_AGAIN) {
549             nxt_port_buf_free(port, b);
550 
551             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
552             return;
553         }
554 
555         /* n == 0 || n == NXT_ERROR */
556 
557         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
558                            nxt_port_error_handler, task, &port->socket, NULL);
559         return;
560     }
561 }
562 
563 
564 static nxt_int_t
565 nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
566 {
567     nxt_port_recv_msg_t  *fmsg;
568 
569     fmsg = data;
570 
571     if (lhq->key.length == sizeof(uint32_t)
572         && *(uint32_t *) lhq->key.start == fmsg->port_msg.stream)
573     {
574         return NXT_OK;
575     }
576 
577     return NXT_DECLINED;
578 }
579 
580 
581 static void *
582 nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
583 {
584     return nxt_mp_alloc(ctx, size);
585 }
586 
587 
588 static void
589 nxt_port_lvlhsh_frag_free(void *ctx, void *p)
590 {
591     nxt_mp_free(ctx, p);
592 }
593 
594 
595 static const nxt_lvlhsh_proto_t  lvlhsh_frag_proto  nxt_aligned(64) = {
596     NXT_LVLHSH_DEFAULT,
597     nxt_port_lvlhsh_frag_test,
598     nxt_port_lvlhsh_frag_alloc,
599     nxt_port_lvlhsh_frag_free,
600 };
601 
602 
603 static nxt_port_recv_msg_t *
604 nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
605     nxt_port_recv_msg_t *msg)
606 {
607     nxt_int_t            res;
608     nxt_lvlhsh_query_t   lhq;
609     nxt_port_recv_msg_t  *fmsg;
610 
611     nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
612 
613     fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
614 
615     if (nxt_slow_path(fmsg == NULL)) {
616         return NULL;
617     }
618 
619     *fmsg = *msg;
620 
621     lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t));
622     lhq.key.length = sizeof(uint32_t);
623     lhq.key.start = (u_char *) &fmsg->port_msg.stream;
624     lhq.proto = &lvlhsh_frag_proto;
625     lhq.replace = 0;
626     lhq.value = fmsg;
627     lhq.pool = port->mem_pool;
628 
629     res = nxt_lvlhsh_insert(&port->frags, &lhq);
630 
631     switch (res) {
632 
633     case NXT_OK:
634         return fmsg;
635 
636     case NXT_DECLINED:
637         nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
638                 fmsg->port_msg.stream);
639         nxt_mp_free(port->mem_pool, fmsg);
640 
641         return NULL;
642 
643     default:
644         nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
645                 fmsg->port_msg.stream);
646 
647         nxt_mp_free(port->mem_pool, fmsg);
648 
649         return NULL;
650 
651     }
652 }
653 
654 
655 static nxt_port_recv_msg_t *
656 nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream,
657     nxt_bool_t last)
658 {
659     nxt_int_t          res;
660     nxt_lvlhsh_query_t lhq;
661 
662     nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream);
663 
664     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t));
665     lhq.key.length = sizeof(uint32_t);
666     lhq.key.start = (u_char *) &stream;
667     lhq.proto = &lvlhsh_frag_proto;
668     lhq.pool = port->mem_pool;
669 
670     res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
671           nxt_lvlhsh_find(&port->frags, &lhq);
672 
673     switch (res) {
674 
675     case NXT_OK:
676         return lhq.value;
677 
678     default:
679         nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", stream);
680 
681         return NULL;
682     }
683 }
684 
685 
686 static void
687 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
688     nxt_port_recv_msg_t *msg)
689 {
690     nxt_buf_t            *b, *orig_b;
691     nxt_port_recv_msg_t  *fmsg;
692 
693     if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
694         nxt_log(task, NXT_LOG_CRIT,
695                 "port %d: too small message:%uz", port->socket.fd, msg->size);
696 
697         if (msg->fd != -1) {
698             nxt_fd_close(msg->fd);
699         }
700 
701         return;
702     }
703 
704     /* adjust size to actual buffer used size */
705     msg->size -= sizeof(nxt_port_msg_t);
706 
707     b = orig_b = msg->buf;
708     b->mem.free += msg->size;
709 
710     if (msg->port_msg.tracking) {
711         msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
712 
713     } else {
714         msg->cancelled = 0;
715     }
716 
717     if (nxt_slow_path(msg->port_msg.nf != 0)) {
718 
719         fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream,
720                                   msg->port_msg.mf == 0);
721 
722         if (nxt_slow_path(fmsg == NULL)) {
723             goto fmsg_failed;
724         }
725 
726         if (nxt_fast_path(fmsg->cancelled == 0)) {
727 
728             if (msg->port_msg.mmap) {
729                 nxt_port_mmap_read(task, msg);
730             }
731 
732             nxt_buf_chain_add(&fmsg->buf, msg->buf);
733 
734             fmsg->size += msg->size;
735             msg->buf = NULL;
736             b = NULL;
737 
738             if (nxt_fast_path(msg->port_msg.mf == 0)) {
739 
740                 b = fmsg->buf;
741 
742                 port->handler(task, fmsg);
743 
744                 msg->buf = fmsg->buf;
745                 msg->fd = fmsg->fd;
746             }
747         }
748 
749         if (nxt_fast_path(msg->port_msg.mf == 0)) {
750             nxt_mp_free(port->mem_pool, fmsg);
751         }
752     } else {
753         if (nxt_slow_path(msg->port_msg.mf != 0)) {
754 
755             if (msg->port_msg.mmap && msg->cancelled == 0) {
756                 nxt_port_mmap_read(task, msg);
757                 b = msg->buf;
758             }
759 
760             fmsg = nxt_port_frag_start(task, port, msg);
761 
762             if (nxt_slow_path(fmsg == NULL)) {
763                 goto fmsg_failed;
764             }
765 
766             fmsg->port_msg.nf = 0;
767             fmsg->port_msg.mf = 0;
768 
769             if (nxt_fast_path(msg->cancelled == 0)) {
770                 msg->buf = NULL;
771                 msg->fd = -1;
772                 b = NULL;
773 
774             } else {
775                 if (msg->fd != -1) {
776                     nxt_fd_close(msg->fd);
777                 }
778             }
779         } else {
780             if (nxt_fast_path(msg->cancelled == 0)) {
781 
782                 if (msg->port_msg.mmap) {
783                     nxt_port_mmap_read(task, msg);
784                     b = msg->buf;
785                 }
786 
787                 port->handler(task, msg);
788             }
789         }
790     }
791 
792 fmsg_failed:
793 
794     if (msg->port_msg.mmap && orig_b != b) {
795 
796         /*
797          * To disable instant buffer completion,
798          * handler should reset 'msg->buf'.
799          */
800         if (msg->buf == b) {
801             /* complete mmap buffers */
802             for (; b != NULL; b = b->next) {
803                 nxt_debug(task, "complete buffer %p", b);
804 
805                 nxt_work_queue_add(port->socket.read_work_queue,
806                     b->completion_handler, task, b, b->parent);
807             }
808         }
809 
810         /* restore original buf */
811         msg->buf = orig_b;
812     }
813 }
814 
815 
816 static nxt_buf_t *
817 nxt_port_buf_alloc(nxt_port_t *port)
818 {
819     nxt_buf_t  *b;
820 
821     if (port->free_bufs != NULL) {
822         b = port->free_bufs;
823         port->free_bufs = b->next;
824 
825         b->mem.pos = b->mem.start;
826         b->mem.free = b->mem.start;
827         b->next = NULL;
828     } else {
829         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
830         if (nxt_slow_path(b == NULL)) {
831             return NULL;
832         }
833     }
834 
835     return b;
836 }
837 
838 
839 static void
840 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
841 {
842     b->next = port->free_bufs;
843     port->free_bufs = b;
844 }
845 
846 
847 static void
848 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
849 {
850     int                  use_delta;
851     nxt_buf_t            *b;
852     nxt_port_t           *port;
853     nxt_work_queue_t     *wq;
854     nxt_port_send_msg_t  *msg;
855 
856     nxt_debug(task, "port error handler %p", obj);
857     /* TODO */
858 
859     port = nxt_container_of(obj, nxt_port_t, socket);
860 
861     use_delta = 0;
862 
863     if (obj == data) {
864         use_delta--;
865     }
866 
867     wq = &task->thread->engine->fast_work_queue;
868 
869     nxt_thread_mutex_lock(&port->write_mutex);
870 
871     nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
872 
873         for (b = msg->buf; b != NULL; b = b->next) {
874             if (nxt_buf_is_sync(b)) {
875                 continue;
876             }
877 
878             nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
879         }
880 
881         nxt_queue_remove(&msg->link);
882         use_delta--;
883         nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
884                            msg->work.data);
885 
886     } nxt_queue_loop;
887 
888     nxt_thread_mutex_unlock(&port->write_mutex);
889 
890     if (use_delta != 0) {
891         nxt_port_use(task, port, use_delta);
892     }
893 }
894