xref: /unit/src/nxt_port_socket.c (revision 2139:99d792169ffb)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_socket_msg.h>
9 #include <nxt_port_queue.h>
10 #include <nxt_port_memory_int.h>
11 
12 
13 #define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \
14           (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))
15 
16 
17 static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b);
18 static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm,
19     void *qbuf, nxt_buf_t *b);
20 static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
21     nxt_port_send_msg_t *msg);
22 static nxt_port_send_msg_t *nxt_port_msg_alloc(const nxt_port_send_msg_t *m);
23 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
24 static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
25 nxt_inline void nxt_port_msg_close_fd(nxt_port_send_msg_t *msg);
26 nxt_inline void nxt_port_close_fds(nxt_fd_t *fd);
27 static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
28     nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
29 static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
30     nxt_port_send_msg_t *msg);
31 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
32 static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj,
33     void *data);
34 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
35     nxt_port_recv_msg_t *msg);
36 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
37 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
38 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
39 
40 
41 nxt_int_t
nxt_port_socket_init(nxt_task_t * task,nxt_port_t * port,size_t max_size)42 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
43 {
44     nxt_int_t     sndbuf, rcvbuf, size;
45     nxt_socket_t  snd, rcv;
46 
47     port->socket.task = task;
48 
49     port->pair[0] = -1;
50     port->pair[1] = -1;
51 
52     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
53         goto socketpair_fail;
54     }
55 
56     snd = port->pair[1];
57 
58     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
59     if (nxt_slow_path(sndbuf < 0)) {
60         goto getsockopt_fail;
61     }
62 
63     rcv = port->pair[0];
64 
65     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
66     if (nxt_slow_path(rcvbuf < 0)) {
67         goto getsockopt_fail;
68     }
69 
70     if (max_size == 0) {
71         max_size = 16 * 1024;
72     }
73 
74     if ((size_t) sndbuf < max_size) {
75         /*
76          * On Unix domain sockets
77          *   Linux uses 224K on both send and receive directions;
78          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
79          *   on send direction and 4K buffer size on receive direction;
80          *   Solaris uses 16K on send direction and 5K on receive direction.
81          */
82         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
83                                      max_size);
84 
85         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
86         if (nxt_slow_path(sndbuf < 0)) {
87             goto getsockopt_fail;
88         }
89 
90         size = sndbuf * 4;
91 
92         if (rcvbuf < size) {
93             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
94                                          size);
95 
96             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
97             if (nxt_slow_path(rcvbuf < 0)) {
98                 goto getsockopt_fail;
99             }
100         }
101     }
102 
103     port->max_size = nxt_min(max_size, (size_t) sndbuf);
104     port->max_share = (64 * 1024);
105 
106     return NXT_OK;
107 
108 getsockopt_fail:
109 
110     nxt_socket_close(task, port->pair[0]);
111     nxt_socket_close(task, port->pair[1]);
112 
113 socketpair_fail:
114 
115     return NXT_ERROR;
116 }
117 
118 
119 void
nxt_port_destroy(nxt_port_t * port)120 nxt_port_destroy(nxt_port_t *port)
121 {
122     nxt_socket_close(port->socket.task, port->socket.fd);
123     nxt_mp_destroy(port->mem_pool);
124 }
125 
126 
127 void
nxt_port_write_enable(nxt_task_t * task,nxt_port_t * port)128 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
129 {
130     port->socket.fd = port->pair[1];
131     port->socket.log = &nxt_main_log;
132     port->socket.write_ready = 1;
133 
134     port->engine = task->thread->engine;
135 
136     port->socket.write_work_queue = &port->engine->fast_work_queue;
137     port->socket.write_handler = nxt_port_write_handler;
138     port->socket.error_handler = nxt_port_error_handler;
139 }
140 
141 
142 void
nxt_port_write_close(nxt_port_t * port)143 nxt_port_write_close(nxt_port_t *port)
144 {
145     nxt_socket_close(port->socket.task, port->pair[1]);
146     port->pair[1] = -1;
147 }
148 
149 
150 static void
nxt_port_release_send_msg(nxt_port_send_msg_t * msg)151 nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
152 {
153     if (msg->allocated) {
154         nxt_free(msg);
155     }
156 }
157 
158 
159 nxt_int_t
nxt_port_socket_write2(nxt_task_t * task,nxt_port_t * port,nxt_uint_t type,nxt_fd_t fd,nxt_fd_t fd2,uint32_t stream,nxt_port_id_t reply_port,nxt_buf_t * b)160 nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
161     nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port,
162     nxt_buf_t *b)
163 {
164     int                  notify;
165     uint8_t              qmsg_size;
166     nxt_int_t            res;
167     nxt_port_send_msg_t  msg;
168     struct {
169         nxt_port_msg_t   pm;
170         uint8_t          buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE];
171     } qmsg;
172 
173     msg.link.next = NULL;
174     msg.link.prev = NULL;
175 
176     msg.buf = b;
177     msg.share = 0;
178     msg.fd[0] = fd;
179     msg.fd[1] = fd2;
180     msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
181     msg.allocated = 0;
182 
183     msg.port_msg.stream = stream;
184     msg.port_msg.pid = nxt_pid;
185     msg.port_msg.reply_port = reply_port;
186     msg.port_msg.type = type & NXT_PORT_MSG_MASK;
187     msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
188     msg.port_msg.mmap = 0;
189     msg.port_msg.nf = 0;
190     msg.port_msg.mf = 0;
191 
192     if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
193 
194         if (fd == -1 && nxt_port_can_enqueue_buf(b)) {
195             qmsg.pm = msg.port_msg;
196 
197             qmsg_size = sizeof(qmsg.pm);
198 
199             if (b != NULL) {
200                 qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b);
201             }
202 
203             res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, &notify);
204 
205             nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
206                       (int) port->pid, (int) port->id, port->socket.fd,
207                       (int) qmsg_size, notify, res);
208 
209             if (b != NULL && nxt_fast_path(res == NXT_OK)) {
210                 if (qmsg.pm.mmap) {
211                     b->is_port_mmap_sent = 1;
212                 }
213 
214                 b->mem.pos = b->mem.free;
215 
216                 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
217                                    b->completion_handler, task, b, b->parent);
218             }
219 
220             if (notify == 0) {
221                 return res;
222             }
223 
224             msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE;
225             msg.buf = NULL;
226 
227         } else {
228             qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET;
229 
230             res = nxt_port_queue_send(port->queue, qmsg.buf, 1, &notify);
231 
232             nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
233                       (int) port->pid, (int) port->id, port->socket.fd,
234                       notify, res);
235 
236             if (nxt_slow_path(res == NXT_AGAIN)) {
237                 return NXT_AGAIN;
238             }
239         }
240     }
241 
242     res = nxt_port_msg_chk_insert(task, port, &msg);
243     if (nxt_fast_path(res == NXT_DECLINED)) {
244         nxt_port_write_handler(task, &port->socket, &msg);
245         res = NXT_OK;
246     }
247 
248     return res;
249 }
250 
251 
252 static nxt_bool_t
nxt_port_can_enqueue_buf(nxt_buf_t * b)253 nxt_port_can_enqueue_buf(nxt_buf_t *b)
254 {
255     if (b == NULL) {
256         return 1;
257     }
258 
259     if (b->next != NULL) {
260         return 0;
261     }
262 
263     return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE
264             || nxt_buf_is_port_mmap(b));
265 }
266 
267 
268 static uint8_t
nxt_port_enqueue_buf(nxt_task_t * task,nxt_port_msg_t * pm,void * qbuf,nxt_buf_t * b)269 nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf,
270     nxt_buf_t *b)
271 {
272     ssize_t                  size;
273     nxt_port_mmap_msg_t      *mm;
274     nxt_port_mmap_header_t   *hdr;
275     nxt_port_mmap_handler_t  *mmap_handler;
276 
277     size = nxt_buf_mem_used_size(&b->mem);
278 
279     if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) {
280         nxt_memcpy(qbuf, b->mem.pos, size);
281 
282         return size;
283     }
284 
285     mmap_handler = b->parent;
286     hdr = mmap_handler->hdr;
287     mm = qbuf;
288 
289     mm->mmap_id = hdr->id;
290     mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos);
291     mm->size = nxt_buf_mem_used_size(&b->mem);
292 
293     pm->mmap = 1;
294 
295     nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id,
296               mm->size);
297 
298     return sizeof(nxt_port_mmap_msg_t);
299 }
300 
301 
302 static nxt_int_t
nxt_port_msg_chk_insert(nxt_task_t * task,nxt_port_t * port,nxt_port_send_msg_t * msg)303 nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
304     nxt_port_send_msg_t *msg)
305 {
306     nxt_int_t  res;
307 
308     nxt_thread_mutex_lock(&port->write_mutex);
309 
310     if (nxt_fast_path(port->socket.write_ready
311                       && nxt_queue_is_empty(&port->messages)))
312     {
313         res = NXT_DECLINED;
314 
315     } else {
316         msg = nxt_port_msg_alloc(msg);
317 
318         if (nxt_fast_path(msg != NULL)) {
319             nxt_queue_insert_tail(&port->messages, &msg->link);
320             nxt_port_use(task, port, 1);
321             res = NXT_OK;
322 
323         } else {
324             res = NXT_ERROR;
325         }
326     }
327 
328     nxt_thread_mutex_unlock(&port->write_mutex);
329 
330     return res;
331 }
332 
333 
334 static nxt_port_send_msg_t *
nxt_port_msg_alloc(const nxt_port_send_msg_t * m)335 nxt_port_msg_alloc(const nxt_port_send_msg_t *m)
336 {
337     nxt_port_send_msg_t  *msg;
338 
339     msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
340     if (nxt_slow_path(msg == NULL)) {
341         return NULL;
342     }
343 
344     *msg = *m;
345 
346     msg->allocated = 1;
347 
348     return msg;
349 }
350 
351 
352 static void
nxt_port_fd_block_write(nxt_task_t * task,nxt_port_t * port,void * data)353 nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
354 {
355     nxt_fd_event_block_write(task->thread->engine, &port->socket);
356 }
357 
358 
359 static void
nxt_port_fd_enable_write(nxt_task_t * task,nxt_port_t * port,void * data)360 nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
361 {
362     nxt_fd_event_enable_write(task->thread->engine, &port->socket);
363 }
364 
365 
366 static void
nxt_port_write_handler(nxt_task_t * task,void * obj,void * data)367 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
368 {
369     int                     use_delta;
370     size_t                  plain_size;
371     ssize_t                 n;
372     uint32_t                mmsg_buf[3 * NXT_IOBUF_MAX * 10];
373     nxt_bool_t              block_write, enable_write;
374     nxt_port_t              *port;
375     struct iovec            iov[NXT_IOBUF_MAX * 10];
376     nxt_work_queue_t        *wq;
377     nxt_port_method_t       m;
378     nxt_port_send_msg_t     *msg;
379     nxt_sendbuf_coalesce_t  sb;
380 
381     port = nxt_container_of(obj, nxt_port_t, socket);
382 
383     block_write = 0;
384     enable_write = 0;
385     use_delta = 0;
386 
387     wq = &task->thread->engine->fast_work_queue;
388 
389     do {
390         if (data) {
391             msg = data;
392 
393         } else {
394             msg = nxt_port_msg_first(port);
395 
396             if (msg == NULL) {
397                 block_write = 1;
398                 goto cleanup;
399             }
400         }
401 
402 next_fragment:
403 
404         iov[0].iov_base = &msg->port_msg;
405         iov[0].iov_len = sizeof(nxt_port_msg_t);
406 
407         sb.buf = msg->buf;
408         sb.iobuf = &iov[1];
409         sb.nmax = NXT_IOBUF_MAX - 1;
410         sb.sync = 0;
411         sb.last = 0;
412         sb.size = 0;
413         sb.limit = port->max_size;
414 
415         sb.limit_reached = 0;
416         sb.nmax_reached = 0;
417 
418         m = nxt_port_mmap_get_method(task, port, msg->buf);
419 
420         if (m == NXT_PORT_METHOD_MMAP) {
421             sb.limit = (1ULL << 31) - 1;
422             sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
423                               port->max_size / PORT_MMAP_MIN_SIZE);
424         }
425 
426         sb.limit -= iov[0].iov_len;
427 
428         nxt_sendbuf_mem_coalesce(task, &sb);
429 
430         plain_size = sb.size;
431 
432         /*
433          * Send through mmap enabled only when payload
434          * is bigger than PORT_MMAP_MIN_SIZE.
435          */
436         if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
437             nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
438 
439         } else {
440             m = NXT_PORT_METHOD_PLAIN;
441         }
442 
443         msg->port_msg.last |= sb.last;
444         msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
445 
446         n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
447 
448         if (n > 0) {
449             if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
450                 nxt_alert(task, "port %d: short write: %z instead of %uz",
451                           port->socket.fd, n, sb.size + iov[0].iov_len);
452                 goto fail;
453             }
454 
455             nxt_port_msg_close_fd(msg);
456 
457             msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
458                                                m == NXT_PORT_METHOD_MMAP);
459 
460             if (msg->buf != NULL) {
461                 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
462                           msg->port_msg.stream);
463 
464                 /*
465                  * A file descriptor is sent only
466                  * in the first message of a stream.
467                  */
468                 msg->fd[0] = -1;
469                 msg->fd[1] = -1;
470                 msg->share += n;
471                 msg->port_msg.nf = 1;
472 
473                 if (msg->share >= port->max_share) {
474                     msg->share = 0;
475 
476                     if (msg->link.next != NULL) {
477                         nxt_thread_mutex_lock(&port->write_mutex);
478 
479                         nxt_queue_remove(&msg->link);
480                         nxt_queue_insert_tail(&port->messages, &msg->link);
481 
482                         nxt_thread_mutex_unlock(&port->write_mutex);
483 
484                     } else {
485                         msg = nxt_port_msg_insert_tail(port, msg);
486                         if (nxt_slow_path(msg == NULL)) {
487                             goto fail;
488                         }
489 
490                         use_delta++;
491                     }
492 
493                 } else {
494                     goto next_fragment;
495                 }
496 
497             } else {
498                 if (msg->link.next != NULL) {
499                     nxt_thread_mutex_lock(&port->write_mutex);
500 
501                     nxt_queue_remove(&msg->link);
502                     msg->link.next = NULL;
503 
504                     nxt_thread_mutex_unlock(&port->write_mutex);
505 
506                     use_delta--;
507                 }
508 
509                 nxt_port_release_send_msg(msg);
510             }
511 
512             if (data != NULL) {
513                 goto cleanup;
514             }
515 
516         } else {
517             if (nxt_slow_path(n == NXT_ERROR)) {
518                 if (msg->link.next == NULL) {
519                     nxt_port_msg_close_fd(msg);
520 
521                     nxt_port_release_send_msg(msg);
522                 }
523 
524                 goto fail;
525             }
526 
527             if (msg->link.next == NULL) {
528                 msg = nxt_port_msg_insert_tail(port, msg);
529                 if (nxt_slow_path(msg == NULL)) {
530                     goto fail;
531                 }
532 
533                 use_delta++;
534             }
535         }
536 
537     } while (port->socket.write_ready);
538 
539     if (nxt_fd_event_is_disabled(port->socket.write)) {
540         enable_write = 1;
541     }
542 
543     goto cleanup;
544 
545 fail:
546 
547     use_delta++;
548 
549     nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
550                        &port->socket);
551 
552 cleanup:
553 
554     if (block_write && nxt_fd_event_is_active(port->socket.write)) {
555         nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
556     }
557 
558     if (enable_write) {
559         nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
560     }
561 
562     if (use_delta != 0) {
563         nxt_port_use(task, port, use_delta);
564     }
565 }
566 
567 
568 static nxt_port_send_msg_t *
nxt_port_msg_first(nxt_port_t * port)569 nxt_port_msg_first(nxt_port_t *port)
570 {
571     nxt_queue_link_t     *lnk;
572     nxt_port_send_msg_t  *msg;
573 
574     nxt_thread_mutex_lock(&port->write_mutex);
575 
576     lnk = nxt_queue_first(&port->messages);
577 
578     if (lnk == nxt_queue_tail(&port->messages)) {
579         msg = NULL;
580 
581     } else {
582         msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
583     }
584 
585     nxt_thread_mutex_unlock(&port->write_mutex);
586 
587     return msg;
588 }
589 
590 
591 nxt_inline void
nxt_port_msg_close_fd(nxt_port_send_msg_t * msg)592 nxt_port_msg_close_fd(nxt_port_send_msg_t *msg)
593 {
594     if (!msg->close_fd) {
595         return;
596     }
597 
598     nxt_port_close_fds(msg->fd);
599 }
600 
601 
602 nxt_inline void
nxt_port_close_fds(nxt_fd_t * fd)603 nxt_port_close_fds(nxt_fd_t *fd)
604 {
605     if (fd[0] != -1) {
606         nxt_fd_close(fd[0]);
607         fd[0] = -1;
608     }
609 
610     if (fd[1] != -1) {
611         nxt_fd_close(fd[1]);
612         fd[1] = -1;
613     }
614 }
615 
616 
617 static nxt_buf_t *
nxt_port_buf_completion(nxt_task_t * task,nxt_work_queue_t * wq,nxt_buf_t * b,size_t sent,nxt_bool_t mmap_mode)618 nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
619     size_t sent, nxt_bool_t mmap_mode)
620 {
621     size_t     size;
622     nxt_buf_t  *next;
623 
624     while (b != NULL) {
625 
626         nxt_prefetch(b->next);
627 
628         if (!nxt_buf_is_sync(b)) {
629 
630             size = nxt_buf_used_size(b);
631 
632             if (size != 0) {
633 
634                 if (sent == 0) {
635                     break;
636                 }
637 
638                 if (nxt_buf_is_port_mmap(b) && mmap_mode) {
639                     /*
640                      * buffer has been sent to other side which is now
641                      * responsible for shared memory bucket release
642                      */
643                     b->is_port_mmap_sent = 1;
644                 }
645 
646                 if (sent < size) {
647 
648                     if (nxt_buf_is_mem(b)) {
649                         b->mem.pos += sent;
650                     }
651 
652                     if (nxt_buf_is_file(b)) {
653                         b->file_pos += sent;
654                     }
655 
656                     break;
657                 }
658 
659                 /* b->mem.free is NULL in file-only buffer. */
660                 b->mem.pos = b->mem.free;
661 
662                 if (nxt_buf_is_file(b)) {
663                     b->file_pos = b->file_end;
664                 }
665 
666                 sent -= size;
667             }
668         }
669 
670         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
671 
672         next = b->next;
673         b->next = NULL;
674         b = next;
675     }
676 
677     return b;
678 }
679 
680 
681 static nxt_port_send_msg_t *
nxt_port_msg_insert_tail(nxt_port_t * port,nxt_port_send_msg_t * msg)682 nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
683 {
684     if (msg->allocated == 0) {
685         msg = nxt_port_msg_alloc(msg);
686 
687         if (nxt_slow_path(msg == NULL)) {
688             return NULL;
689         }
690     }
691 
692     nxt_thread_mutex_lock(&port->write_mutex);
693 
694     nxt_queue_insert_tail(&port->messages, &msg->link);
695 
696     nxt_thread_mutex_unlock(&port->write_mutex);
697 
698     return msg;
699 }
700 
701 
702 void
nxt_port_read_enable(nxt_task_t * task,nxt_port_t * port)703 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
704 {
705     port->socket.fd = port->pair[0];
706     port->socket.log = &nxt_main_log;
707 
708     port->engine = task->thread->engine;
709 
710     port->socket.read_work_queue = &port->engine->fast_work_queue;
711     port->socket.read_handler = port->queue != NULL
712                                 ? nxt_port_queue_read_handler
713                                 : nxt_port_read_handler;
714     port->socket.error_handler = nxt_port_error_handler;
715 
716     nxt_fd_event_enable_read(port->engine, &port->socket);
717 }
718 
719 
720 void
nxt_port_read_close(nxt_port_t * port)721 nxt_port_read_close(nxt_port_t *port)
722 {
723     port->socket.read_ready = 0;
724     port->socket.read = NXT_EVENT_INACTIVE;
725     nxt_socket_close(port->socket.task, port->pair[0]);
726     port->pair[0] = -1;
727 }
728 
729 
730 static void
nxt_port_read_handler(nxt_task_t * task,void * obj,void * data)731 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
732 {
733     ssize_t              n;
734     nxt_buf_t            *b;
735     nxt_int_t            ret;
736     nxt_port_t           *port;
737     nxt_recv_oob_t       oob;
738     nxt_port_recv_msg_t  msg;
739     struct iovec         iov[2];
740 
741     port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
742 
743     nxt_assert(port->engine == task->thread->engine);
744 
745     for ( ;; ) {
746         b = nxt_port_buf_alloc(port);
747 
748         if (nxt_slow_path(b == NULL)) {
749             /* TODO: disable event for some time */
750         }
751 
752         iov[0].iov_base = &msg.port_msg;
753         iov[0].iov_len = sizeof(nxt_port_msg_t);
754 
755         iov[1].iov_base = b->mem.pos;
756         iov[1].iov_len = port->max_size;
757 
758         n = nxt_socketpair_recv(&port->socket, iov, 2, &oob);
759 
760         if (n > 0) {
761             msg.fd[0] = -1;
762             msg.fd[1] = -1;
763 
764             ret = nxt_socket_msg_oob_get(&oob, msg.fd,
765                                          nxt_recv_msg_cmsg_pid_ref(&msg));
766             if (nxt_slow_path(ret != NXT_OK)) {
767                 nxt_alert(task, "failed to get oob data from %d",
768                           port->socket.fd);
769 
770                 nxt_port_close_fds(msg.fd);
771 
772                 goto fail;
773             }
774 
775             msg.buf = b;
776             msg.size = n;
777 
778             nxt_port_read_msg_process(task, port, &msg);
779 
780             /*
781              * To disable instant completion or buffer re-usage,
782              * handler should reset 'msg.buf'.
783              */
784             if (msg.buf == b) {
785                 nxt_port_buf_free(port, b);
786             }
787 
788             if (port->socket.read_ready) {
789                 continue;
790             }
791 
792             return;
793         }
794 
795         if (n == NXT_AGAIN) {
796             nxt_port_buf_free(port, b);
797 
798             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
799             return;
800         }
801 
802 fail:
803         /* n == 0 || error  */
804         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
805                            nxt_port_error_handler, task, &port->socket, NULL);
806         return;
807     }
808 }
809 
810 
811 static void
nxt_port_queue_read_handler(nxt_task_t * task,void * obj,void * data)812 nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
813 {
814     ssize_t              n;
815     nxt_buf_t            *b;
816     nxt_int_t            ret;
817     nxt_port_t           *port;
818     struct iovec         iov[2];
819     nxt_recv_oob_t       oob;
820     nxt_port_queue_t     *queue;
821     nxt_port_recv_msg_t  msg, *smsg;
822     uint8_t              qmsg[NXT_PORT_QUEUE_MSG_SIZE];
823 
824     port = nxt_container_of(obj, nxt_port_t, socket);
825     msg.port = port;
826 
827     nxt_assert(port->engine == task->thread->engine);
828 
829     queue = port->queue;
830     nxt_atomic_fetch_add(&queue->nitems, 1);
831 
832     for ( ;; ) {
833 
834         if (port->from_socket == 0) {
835             n = nxt_port_queue_recv(queue, qmsg);
836 
837             if (n < 0 && !port->socket.read_ready) {
838                 nxt_atomic_fetch_add(&queue->nitems, -1);
839 
840                 n = nxt_port_queue_recv(queue, qmsg);
841                 if (n < 0) {
842                     return;
843                 }
844 
845                 nxt_atomic_fetch_add(&queue->nitems, 1);
846             }
847 
848             if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) {
849                 port->from_socket++;
850 
851                 nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d",
852                           (int) port->pid, (int) port->id, port->socket.fd,
853                           port->from_socket);
854 
855                 continue;
856             }
857 
858             nxt_debug(task, "port{%d,%d} %d: dequeue %d",
859                       (int) port->pid, (int) port->id, port->socket.fd,
860                       (int) n);
861 
862         } else {
863             if ((smsg = port->socket_msg) != NULL && smsg->size != 0) {
864                 msg.port_msg = smsg->port_msg;
865                 b = smsg->buf;
866                 n = smsg->size;
867                 msg.fd[0] = smsg->fd[0];
868                 msg.fd[1] = smsg->fd[1];
869 
870                 smsg->size = 0;
871 
872                 port->from_socket--;
873 
874                 nxt_debug(task, "port{%d,%d} %d: use suspended message %d",
875                           (int) port->pid, (int) port->id, port->socket.fd,
876                           (int) n);
877 
878                 goto process;
879             }
880 
881             n = -1;
882         }
883 
884         if (n < 0 && !port->socket.read_ready) {
885             nxt_atomic_fetch_add(&queue->nitems, -1);
886             return;
887         }
888 
889         b = nxt_port_buf_alloc(port);
890 
891         if (nxt_slow_path(b == NULL)) {
892             /* TODO: disable event for some time */
893         }
894 
895         if (n >= (ssize_t) sizeof(nxt_port_msg_t)) {
896             nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t));
897 
898             if (n > (ssize_t) sizeof(nxt_port_msg_t)) {
899                 nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t),
900                            n - sizeof(nxt_port_msg_t));
901             }
902 
903         } else {
904             iov[0].iov_base = &msg.port_msg;
905             iov[0].iov_len = sizeof(nxt_port_msg_t);
906 
907             iov[1].iov_base = b->mem.pos;
908             iov[1].iov_len = port->max_size;
909 
910             n = nxt_socketpair_recv(&port->socket, iov, 2, &oob);
911 
912             if (n > 0) {
913                 msg.fd[0] = -1;
914                 msg.fd[1] = -1;
915 
916                 ret = nxt_socket_msg_oob_get(&oob, msg.fd,
917                                              nxt_recv_msg_cmsg_pid_ref(&msg));
918                 if (nxt_slow_path(ret != NXT_OK)) {
919                     nxt_alert(task, "failed to get oob data from %d",
920                               port->socket.fd);
921 
922                     nxt_port_close_fds(msg.fd);
923 
924                     return;
925                 }
926             }
927 
928             if (n == (ssize_t) sizeof(nxt_port_msg_t)
929                 && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
930             {
931                 nxt_port_buf_free(port, b);
932 
933                 nxt_debug(task, "port{%d,%d} %d: recv %d read_queue",
934                           (int) port->pid, (int) port->id, port->socket.fd,
935                           (int) n);
936 
937                 continue;
938             }
939 
940             nxt_debug(task, "port{%d,%d} %d: recvmsg %d",
941                       (int) port->pid, (int) port->id, port->socket.fd,
942                       (int) n);
943 
944             if (n > 0) {
945                 if (port->from_socket == 0) {
946                     nxt_debug(task, "port{%d,%d} %d: suspend message %d",
947                               (int) port->pid, (int) port->id, port->socket.fd,
948                               (int) n);
949 
950                     smsg = port->socket_msg;
951 
952                     if (nxt_slow_path(smsg == NULL)) {
953                         smsg = nxt_mp_alloc(port->mem_pool,
954                                             sizeof(nxt_port_recv_msg_t));
955 
956                         if (nxt_slow_path(smsg == NULL)) {
957                             nxt_alert(task, "port{%d,%d} %d: suspend message "
958                                             "failed",
959                                       (int) port->pid, (int) port->id,
960                                       port->socket.fd);
961 
962                             return;
963                         }
964 
965                         port->socket_msg = smsg;
966 
967                     } else {
968                         if (nxt_slow_path(smsg->size != 0)) {
969                             nxt_alert(task, "port{%d,%d} %d: too many suspend "
970                                             "messages",
971                                       (int) port->pid, (int) port->id,
972                                       port->socket.fd);
973 
974                             return;
975                         }
976                     }
977 
978                     smsg->port_msg = msg.port_msg;
979                     smsg->buf = b;
980                     smsg->size = n;
981                     smsg->fd[0] = msg.fd[0];
982                     smsg->fd[1] = msg.fd[1];
983 
984                     continue;
985                 }
986 
987                 port->from_socket--;
988             }
989         }
990 
991     process:
992 
993         if (n > 0) {
994             msg.buf = b;
995             msg.size = n;
996 
997             nxt_port_read_msg_process(task, port, &msg);
998 
999             /*
1000              * To disable instant completion or buffer re-usage,
1001              * handler should reset 'msg.buf'.
1002              */
1003             if (msg.buf == b) {
1004                 nxt_port_buf_free(port, b);
1005             }
1006 
1007             continue;
1008         }
1009 
1010         if (n == NXT_AGAIN) {
1011             nxt_port_buf_free(port, b);
1012 
1013             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
1014 
1015             continue;
1016         }
1017 
1018         /* n == 0 || n == NXT_ERROR */
1019 
1020         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1021                            nxt_port_error_handler, task, &port->socket, NULL);
1022         return;
1023     }
1024 }
1025 
1026 
1027 typedef struct {
1028     uint32_t  stream;
1029     uint32_t  pid;
1030 } nxt_port_frag_key_t;
1031 
1032 
1033 static nxt_int_t
nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t * lhq,void * data)1034 nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
1035 {
1036     nxt_port_recv_msg_t  *fmsg;
1037     nxt_port_frag_key_t  *frag_key;
1038 
1039     fmsg = data;
1040     frag_key = (nxt_port_frag_key_t *) lhq->key.start;
1041 
1042     if (lhq->key.length == sizeof(nxt_port_frag_key_t)
1043         && frag_key->stream == fmsg->port_msg.stream
1044         && frag_key->pid == (uint32_t) fmsg->port_msg.pid)
1045     {
1046         return NXT_OK;
1047     }
1048 
1049     return NXT_DECLINED;
1050 }
1051 
1052 
1053 static void *
nxt_port_lvlhsh_frag_alloc(void * ctx,size_t size)1054 nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
1055 {
1056     return nxt_mp_align(ctx, size, size);
1057 }
1058 
1059 
1060 static void
nxt_port_lvlhsh_frag_free(void * ctx,void * p)1061 nxt_port_lvlhsh_frag_free(void *ctx, void *p)
1062 {
1063     nxt_mp_free(ctx, p);
1064 }
1065 
1066 
1067 static const nxt_lvlhsh_proto_t  lvlhsh_frag_proto  nxt_aligned(64) = {
1068     NXT_LVLHSH_DEFAULT,
1069     nxt_port_lvlhsh_frag_test,
1070     nxt_port_lvlhsh_frag_alloc,
1071     nxt_port_lvlhsh_frag_free,
1072 };
1073 
1074 
1075 static nxt_port_recv_msg_t *
nxt_port_frag_start(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)1076 nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
1077     nxt_port_recv_msg_t *msg)
1078 {
1079     nxt_int_t            res;
1080     nxt_lvlhsh_query_t   lhq;
1081     nxt_port_recv_msg_t  *fmsg;
1082     nxt_port_frag_key_t  frag_key;
1083 
1084     nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
1085 
1086     fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
1087 
1088     if (nxt_slow_path(fmsg == NULL)) {
1089         return NULL;
1090     }
1091 
1092     *fmsg = *msg;
1093 
1094     frag_key.stream = fmsg->port_msg.stream;
1095     frag_key.pid = fmsg->port_msg.pid;
1096 
1097     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
1098     lhq.key.length = sizeof(nxt_port_frag_key_t);
1099     lhq.key.start = (u_char *) &frag_key;
1100     lhq.proto = &lvlhsh_frag_proto;
1101     lhq.replace = 0;
1102     lhq.value = fmsg;
1103     lhq.pool = port->mem_pool;
1104 
1105     res = nxt_lvlhsh_insert(&port->frags, &lhq);
1106 
1107     switch (res) {
1108 
1109     case NXT_OK:
1110         return fmsg;
1111 
1112     case NXT_DECLINED:
1113         nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
1114                 fmsg->port_msg.stream);
1115         nxt_mp_free(port->mem_pool, fmsg);
1116 
1117         return NULL;
1118 
1119     default:
1120         nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
1121                 fmsg->port_msg.stream);
1122 
1123         nxt_mp_free(port->mem_pool, fmsg);
1124 
1125         return NULL;
1126 
1127     }
1128 }
1129 
1130 
1131 static nxt_port_recv_msg_t *
nxt_port_frag_find(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)1132 nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
1133 {
1134     nxt_int_t            res;
1135     nxt_bool_t           last;
1136     nxt_lvlhsh_query_t   lhq;
1137     nxt_port_frag_key_t  frag_key;
1138 
1139     last = msg->port_msg.mf == 0;
1140 
1141     nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
1142               msg->port_msg.stream);
1143 
1144     frag_key.stream = msg->port_msg.stream;
1145     frag_key.pid = msg->port_msg.pid;
1146 
1147     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
1148     lhq.key.length = sizeof(nxt_port_frag_key_t);
1149     lhq.key.start = (u_char *) &frag_key;
1150     lhq.proto = &lvlhsh_frag_proto;
1151     lhq.pool = port->mem_pool;
1152 
1153     res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
1154           nxt_lvlhsh_find(&port->frags, &lhq);
1155 
1156     switch (res) {
1157 
1158     case NXT_OK:
1159         return lhq.value;
1160 
1161     default:
1162         nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
1163                 frag_key.stream);
1164 
1165         return NULL;
1166     }
1167 }
1168 
1169 
1170 static void
nxt_port_read_msg_process(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)1171 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
1172     nxt_port_recv_msg_t *msg)
1173 {
1174     nxt_buf_t            *b, *orig_b, *next;
1175     nxt_port_recv_msg_t  *fmsg;
1176 
1177     if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
1178         nxt_alert(task, "port %d: too small message:%uz",
1179                   port->socket.fd, msg->size);
1180 
1181         nxt_port_close_fds(msg->fd);
1182 
1183         return;
1184     }
1185 
1186     /* adjust size to actual buffer used size */
1187     msg->size -= sizeof(nxt_port_msg_t);
1188 
1189     b = orig_b = msg->buf;
1190     b->mem.free += msg->size;
1191 
1192     msg->cancelled = 0;
1193 
1194     if (nxt_slow_path(msg->port_msg.nf != 0)) {
1195 
1196         fmsg = nxt_port_frag_find(task, port, msg);
1197 
1198         if (nxt_slow_path(fmsg == NULL)) {
1199             goto fmsg_failed;
1200         }
1201 
1202         if (nxt_fast_path(fmsg->cancelled == 0)) {
1203 
1204             if (msg->port_msg.mmap) {
1205                 nxt_port_mmap_read(task, msg);
1206             }
1207 
1208             nxt_buf_chain_add(&fmsg->buf, msg->buf);
1209 
1210             fmsg->size += msg->size;
1211             msg->buf = NULL;
1212             b = NULL;
1213 
1214             if (nxt_fast_path(msg->port_msg.mf == 0)) {
1215 
1216                 b = fmsg->buf;
1217 
1218                 port->handler(task, fmsg);
1219 
1220                 msg->buf = fmsg->buf;
1221                 msg->fd[0] = fmsg->fd[0];
1222                 msg->fd[1] = fmsg->fd[1];
1223 
1224                 /*
1225                  * To disable instant completion or buffer re-usage,
1226                  * handler should reset 'msg.buf'.
1227                  */
1228                 if (!msg->port_msg.mmap && msg->buf == b) {
1229                     nxt_port_buf_free(port, b);
1230                 }
1231             }
1232         }
1233 
1234         if (nxt_fast_path(msg->port_msg.mf == 0)) {
1235             nxt_mp_free(port->mem_pool, fmsg);
1236         }
1237     } else {
1238         if (nxt_slow_path(msg->port_msg.mf != 0)) {
1239 
1240             if (msg->port_msg.mmap && msg->cancelled == 0) {
1241                 nxt_port_mmap_read(task, msg);
1242                 b = msg->buf;
1243             }
1244 
1245             fmsg = nxt_port_frag_start(task, port, msg);
1246 
1247             if (nxt_slow_path(fmsg == NULL)) {
1248                 goto fmsg_failed;
1249             }
1250 
1251             fmsg->port_msg.nf = 0;
1252             fmsg->port_msg.mf = 0;
1253 
1254             if (nxt_fast_path(msg->cancelled == 0)) {
1255                 msg->buf = NULL;
1256                 msg->fd[0] = -1;
1257                 msg->fd[1] = -1;
1258                 b = NULL;
1259 
1260             } else {
1261                 nxt_port_close_fds(msg->fd);
1262             }
1263         } else {
1264             if (nxt_fast_path(msg->cancelled == 0)) {
1265 
1266                 if (msg->port_msg.mmap) {
1267                     nxt_port_mmap_read(task, msg);
1268                     b = msg->buf;
1269                 }
1270 
1271                 port->handler(task, msg);
1272             }
1273         }
1274     }
1275 
1276 fmsg_failed:
1277 
1278     if (msg->port_msg.mmap && orig_b != b) {
1279 
1280         /*
1281          * To disable instant buffer completion,
1282          * handler should reset 'msg->buf'.
1283          */
1284         if (msg->buf == b) {
1285             /* complete mmap buffers */
1286             while (b != NULL) {
1287                 nxt_debug(task, "complete buffer %p", b);
1288 
1289                 nxt_work_queue_add(port->socket.read_work_queue,
1290                     b->completion_handler, task, b, b->parent);
1291 
1292                 next = b->next;
1293                 b->next = NULL;
1294                 b = next;
1295             }
1296         }
1297 
1298         /* restore original buf */
1299         msg->buf = orig_b;
1300     }
1301 }
1302 
1303 
1304 static nxt_buf_t *
nxt_port_buf_alloc(nxt_port_t * port)1305 nxt_port_buf_alloc(nxt_port_t *port)
1306 {
1307     nxt_buf_t  *b;
1308 
1309     if (port->free_bufs != NULL) {
1310         b = port->free_bufs;
1311         port->free_bufs = b->next;
1312 
1313         b->mem.pos = b->mem.start;
1314         b->mem.free = b->mem.start;
1315         b->next = NULL;
1316     } else {
1317         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
1318         if (nxt_slow_path(b == NULL)) {
1319             return NULL;
1320         }
1321     }
1322 
1323     return b;
1324 }
1325 
1326 
1327 static void
nxt_port_buf_free(nxt_port_t * port,nxt_buf_t * b)1328 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
1329 {
1330     nxt_buf_chain_add(&b, port->free_bufs);
1331     port->free_bufs = b;
1332 }
1333 
1334 
1335 static void
nxt_port_error_handler(nxt_task_t * task,void * obj,void * data)1336 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
1337 {
1338     int                  use_delta;
1339     nxt_buf_t            *b, *next;
1340     nxt_port_t           *port;
1341     nxt_work_queue_t     *wq;
1342     nxt_port_send_msg_t  *msg;
1343 
1344     nxt_debug(task, "port error handler %p", obj);
1345     /* TODO */
1346 
1347     port = nxt_container_of(obj, nxt_port_t, socket);
1348 
1349     use_delta = 0;
1350 
1351     if (obj == data) {
1352         use_delta--;
1353     }
1354 
1355     wq = &task->thread->engine->fast_work_queue;
1356 
1357     nxt_thread_mutex_lock(&port->write_mutex);
1358 
1359     nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
1360 
1361         nxt_port_msg_close_fd(msg);
1362 
1363         for (b = msg->buf; b != NULL; b = next) {
1364             next = b->next;
1365             b->next = NULL;
1366 
1367             if (nxt_buf_is_sync(b)) {
1368                 continue;
1369             }
1370 
1371             nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
1372         }
1373 
1374         nxt_queue_remove(&msg->link);
1375         use_delta--;
1376 
1377         nxt_port_release_send_msg(msg);
1378 
1379     } nxt_queue_loop;
1380 
1381     nxt_thread_mutex_unlock(&port->write_mutex);
1382 
1383     if (use_delta != 0) {
1384         nxt_port_use(task, port, use_delta);
1385     }
1386 }
1387