1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8 #include <nxt_socket_msg.h>
9 #include <nxt_port_queue.h>
10 #include <nxt_port_memory_int.h>
11
12
13 #define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \
14 (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))
15
16
17 static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b);
18 static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm,
19 void *qbuf, nxt_buf_t *b);
20 static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
21 nxt_port_send_msg_t *msg);
22 static nxt_port_send_msg_t *nxt_port_msg_alloc(const nxt_port_send_msg_t *m);
23 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
24 static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
25 nxt_inline void nxt_port_msg_close_fd(nxt_port_send_msg_t *msg);
26 nxt_inline void nxt_port_close_fds(nxt_fd_t *fd);
27 static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
28 nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
29 static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
30 nxt_port_send_msg_t *msg);
31 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
32 static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj,
33 void *data);
34 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
35 nxt_port_recv_msg_t *msg);
36 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
37 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
38 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
39
40
41 nxt_int_t
nxt_port_socket_init(nxt_task_t * task,nxt_port_t * port,size_t max_size)42 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
43 {
44 nxt_int_t sndbuf, rcvbuf, size;
45 nxt_socket_t snd, rcv;
46
47 port->socket.task = task;
48
49 port->pair[0] = -1;
50 port->pair[1] = -1;
51
52 if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
53 goto socketpair_fail;
54 }
55
56 snd = port->pair[1];
57
58 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
59 if (nxt_slow_path(sndbuf < 0)) {
60 goto getsockopt_fail;
61 }
62
63 rcv = port->pair[0];
64
65 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
66 if (nxt_slow_path(rcvbuf < 0)) {
67 goto getsockopt_fail;
68 }
69
70 if (max_size == 0) {
71 max_size = 16 * 1024;
72 }
73
74 if ((size_t) sndbuf < max_size) {
75 /*
76 * On Unix domain sockets
77 * Linux uses 224K on both send and receive directions;
78 * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
79 * on send direction and 4K buffer size on receive direction;
80 * Solaris uses 16K on send direction and 5K on receive direction.
81 */
82 (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
83 max_size);
84
85 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
86 if (nxt_slow_path(sndbuf < 0)) {
87 goto getsockopt_fail;
88 }
89
90 size = sndbuf * 4;
91
92 if (rcvbuf < size) {
93 (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
94 size);
95
96 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
97 if (nxt_slow_path(rcvbuf < 0)) {
98 goto getsockopt_fail;
99 }
100 }
101 }
102
103 port->max_size = nxt_min(max_size, (size_t) sndbuf);
104 port->max_share = (64 * 1024);
105
106 return NXT_OK;
107
108 getsockopt_fail:
109
110 nxt_socket_close(task, port->pair[0]);
111 nxt_socket_close(task, port->pair[1]);
112
113 socketpair_fail:
114
115 return NXT_ERROR;
116 }
117
118
119 void
nxt_port_destroy(nxt_port_t * port)120 nxt_port_destroy(nxt_port_t *port)
121 {
122 nxt_socket_close(port->socket.task, port->socket.fd);
123 nxt_mp_destroy(port->mem_pool);
124 }
125
126
127 void
nxt_port_write_enable(nxt_task_t * task,nxt_port_t * port)128 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
129 {
130 port->socket.fd = port->pair[1];
131 port->socket.log = &nxt_main_log;
132 port->socket.write_ready = 1;
133
134 port->engine = task->thread->engine;
135
136 port->socket.write_work_queue = &port->engine->fast_work_queue;
137 port->socket.write_handler = nxt_port_write_handler;
138 port->socket.error_handler = nxt_port_error_handler;
139 }
140
141
142 void
nxt_port_write_close(nxt_port_t * port)143 nxt_port_write_close(nxt_port_t *port)
144 {
145 nxt_socket_close(port->socket.task, port->pair[1]);
146 port->pair[1] = -1;
147 }
148
149
150 static void
nxt_port_release_send_msg(nxt_port_send_msg_t * msg)151 nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
152 {
153 if (msg->allocated) {
154 nxt_free(msg);
155 }
156 }
157
158
159 nxt_int_t
nxt_port_socket_write2(nxt_task_t * task,nxt_port_t * port,nxt_uint_t type,nxt_fd_t fd,nxt_fd_t fd2,uint32_t stream,nxt_port_id_t reply_port,nxt_buf_t * b)160 nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
161 nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port,
162 nxt_buf_t *b)
163 {
164 int notify;
165 uint8_t qmsg_size;
166 nxt_int_t res;
167 nxt_port_send_msg_t msg;
168 struct {
169 nxt_port_msg_t pm;
170 uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE];
171 } qmsg;
172
173 msg.link.next = NULL;
174 msg.link.prev = NULL;
175
176 msg.buf = b;
177 msg.share = 0;
178 msg.fd[0] = fd;
179 msg.fd[1] = fd2;
180 msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
181 msg.allocated = 0;
182
183 msg.port_msg.stream = stream;
184 msg.port_msg.pid = nxt_pid;
185 msg.port_msg.reply_port = reply_port;
186 msg.port_msg.type = type & NXT_PORT_MSG_MASK;
187 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
188 msg.port_msg.mmap = 0;
189 msg.port_msg.nf = 0;
190 msg.port_msg.mf = 0;
191
192 if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
193
194 if (fd == -1 && nxt_port_can_enqueue_buf(b)) {
195 qmsg.pm = msg.port_msg;
196
197 qmsg_size = sizeof(qmsg.pm);
198
199 if (b != NULL) {
200 qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b);
201 }
202
203 res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, ¬ify);
204
205 nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
206 (int) port->pid, (int) port->id, port->socket.fd,
207 (int) qmsg_size, notify, res);
208
209 if (b != NULL && nxt_fast_path(res == NXT_OK)) {
210 if (qmsg.pm.mmap) {
211 b->is_port_mmap_sent = 1;
212 }
213
214 b->mem.pos = b->mem.free;
215
216 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
217 b->completion_handler, task, b, b->parent);
218 }
219
220 if (notify == 0) {
221 return res;
222 }
223
224 msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE;
225 msg.buf = NULL;
226
227 } else {
228 qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET;
229
230 res = nxt_port_queue_send(port->queue, qmsg.buf, 1, ¬ify);
231
232 nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
233 (int) port->pid, (int) port->id, port->socket.fd,
234 notify, res);
235
236 if (nxt_slow_path(res == NXT_AGAIN)) {
237 return NXT_AGAIN;
238 }
239 }
240 }
241
242 res = nxt_port_msg_chk_insert(task, port, &msg);
243 if (nxt_fast_path(res == NXT_DECLINED)) {
244 nxt_port_write_handler(task, &port->socket, &msg);
245 res = NXT_OK;
246 }
247
248 return res;
249 }
250
251
252 static nxt_bool_t
nxt_port_can_enqueue_buf(nxt_buf_t * b)253 nxt_port_can_enqueue_buf(nxt_buf_t *b)
254 {
255 if (b == NULL) {
256 return 1;
257 }
258
259 if (b->next != NULL) {
260 return 0;
261 }
262
263 return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE
264 || nxt_buf_is_port_mmap(b));
265 }
266
267
268 static uint8_t
nxt_port_enqueue_buf(nxt_task_t * task,nxt_port_msg_t * pm,void * qbuf,nxt_buf_t * b)269 nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf,
270 nxt_buf_t *b)
271 {
272 ssize_t size;
273 nxt_port_mmap_msg_t *mm;
274 nxt_port_mmap_header_t *hdr;
275 nxt_port_mmap_handler_t *mmap_handler;
276
277 size = nxt_buf_mem_used_size(&b->mem);
278
279 if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) {
280 nxt_memcpy(qbuf, b->mem.pos, size);
281
282 return size;
283 }
284
285 mmap_handler = b->parent;
286 hdr = mmap_handler->hdr;
287 mm = qbuf;
288
289 mm->mmap_id = hdr->id;
290 mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos);
291 mm->size = nxt_buf_mem_used_size(&b->mem);
292
293 pm->mmap = 1;
294
295 nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id,
296 mm->size);
297
298 return sizeof(nxt_port_mmap_msg_t);
299 }
300
301
302 static nxt_int_t
nxt_port_msg_chk_insert(nxt_task_t * task,nxt_port_t * port,nxt_port_send_msg_t * msg)303 nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
304 nxt_port_send_msg_t *msg)
305 {
306 nxt_int_t res;
307
308 nxt_thread_mutex_lock(&port->write_mutex);
309
310 if (nxt_fast_path(port->socket.write_ready
311 && nxt_queue_is_empty(&port->messages)))
312 {
313 res = NXT_DECLINED;
314
315 } else {
316 msg = nxt_port_msg_alloc(msg);
317
318 if (nxt_fast_path(msg != NULL)) {
319 nxt_queue_insert_tail(&port->messages, &msg->link);
320 nxt_port_use(task, port, 1);
321 res = NXT_OK;
322
323 } else {
324 res = NXT_ERROR;
325 }
326 }
327
328 nxt_thread_mutex_unlock(&port->write_mutex);
329
330 return res;
331 }
332
333
334 static nxt_port_send_msg_t *
nxt_port_msg_alloc(const nxt_port_send_msg_t * m)335 nxt_port_msg_alloc(const nxt_port_send_msg_t *m)
336 {
337 nxt_port_send_msg_t *msg;
338
339 msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
340 if (nxt_slow_path(msg == NULL)) {
341 return NULL;
342 }
343
344 *msg = *m;
345
346 msg->allocated = 1;
347
348 return msg;
349 }
350
351
352 static void
nxt_port_fd_block_write(nxt_task_t * task,nxt_port_t * port,void * data)353 nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
354 {
355 nxt_fd_event_block_write(task->thread->engine, &port->socket);
356 }
357
358
359 static void
nxt_port_fd_enable_write(nxt_task_t * task,nxt_port_t * port,void * data)360 nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
361 {
362 nxt_fd_event_enable_write(task->thread->engine, &port->socket);
363 }
364
365
366 static void
nxt_port_write_handler(nxt_task_t * task,void * obj,void * data)367 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
368 {
369 int use_delta;
370 size_t plain_size;
371 ssize_t n;
372 uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10];
373 nxt_bool_t block_write, enable_write;
374 nxt_port_t *port;
375 struct iovec iov[NXT_IOBUF_MAX * 10];
376 nxt_work_queue_t *wq;
377 nxt_port_method_t m;
378 nxt_port_send_msg_t *msg;
379 nxt_sendbuf_coalesce_t sb;
380
381 port = nxt_container_of(obj, nxt_port_t, socket);
382
383 block_write = 0;
384 enable_write = 0;
385 use_delta = 0;
386
387 wq = &task->thread->engine->fast_work_queue;
388
389 do {
390 if (data) {
391 msg = data;
392
393 } else {
394 msg = nxt_port_msg_first(port);
395
396 if (msg == NULL) {
397 block_write = 1;
398 goto cleanup;
399 }
400 }
401
402 next_fragment:
403
404 iov[0].iov_base = &msg->port_msg;
405 iov[0].iov_len = sizeof(nxt_port_msg_t);
406
407 sb.buf = msg->buf;
408 sb.iobuf = &iov[1];
409 sb.nmax = NXT_IOBUF_MAX - 1;
410 sb.sync = 0;
411 sb.last = 0;
412 sb.size = 0;
413 sb.limit = port->max_size;
414
415 sb.limit_reached = 0;
416 sb.nmax_reached = 0;
417
418 m = nxt_port_mmap_get_method(task, port, msg->buf);
419
420 if (m == NXT_PORT_METHOD_MMAP) {
421 sb.limit = (1ULL << 31) - 1;
422 sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
423 port->max_size / PORT_MMAP_MIN_SIZE);
424 }
425
426 sb.limit -= iov[0].iov_len;
427
428 nxt_sendbuf_mem_coalesce(task, &sb);
429
430 plain_size = sb.size;
431
432 /*
433 * Send through mmap enabled only when payload
434 * is bigger than PORT_MMAP_MIN_SIZE.
435 */
436 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
437 nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
438
439 } else {
440 m = NXT_PORT_METHOD_PLAIN;
441 }
442
443 msg->port_msg.last |= sb.last;
444 msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
445
446 n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
447
448 if (n > 0) {
449 if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
450 nxt_alert(task, "port %d: short write: %z instead of %uz",
451 port->socket.fd, n, sb.size + iov[0].iov_len);
452 goto fail;
453 }
454
455 nxt_port_msg_close_fd(msg);
456
457 msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
458 m == NXT_PORT_METHOD_MMAP);
459
460 if (msg->buf != NULL) {
461 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
462 msg->port_msg.stream);
463
464 /*
465 * A file descriptor is sent only
466 * in the first message of a stream.
467 */
468 msg->fd[0] = -1;
469 msg->fd[1] = -1;
470 msg->share += n;
471 msg->port_msg.nf = 1;
472
473 if (msg->share >= port->max_share) {
474 msg->share = 0;
475
476 if (msg->link.next != NULL) {
477 nxt_thread_mutex_lock(&port->write_mutex);
478
479 nxt_queue_remove(&msg->link);
480 nxt_queue_insert_tail(&port->messages, &msg->link);
481
482 nxt_thread_mutex_unlock(&port->write_mutex);
483
484 } else {
485 msg = nxt_port_msg_insert_tail(port, msg);
486 if (nxt_slow_path(msg == NULL)) {
487 goto fail;
488 }
489
490 use_delta++;
491 }
492
493 } else {
494 goto next_fragment;
495 }
496
497 } else {
498 if (msg->link.next != NULL) {
499 nxt_thread_mutex_lock(&port->write_mutex);
500
501 nxt_queue_remove(&msg->link);
502 msg->link.next = NULL;
503
504 nxt_thread_mutex_unlock(&port->write_mutex);
505
506 use_delta--;
507 }
508
509 nxt_port_release_send_msg(msg);
510 }
511
512 if (data != NULL) {
513 goto cleanup;
514 }
515
516 } else {
517 if (nxt_slow_path(n == NXT_ERROR)) {
518 if (msg->link.next == NULL) {
519 nxt_port_msg_close_fd(msg);
520
521 nxt_port_release_send_msg(msg);
522 }
523
524 goto fail;
525 }
526
527 if (msg->link.next == NULL) {
528 msg = nxt_port_msg_insert_tail(port, msg);
529 if (nxt_slow_path(msg == NULL)) {
530 goto fail;
531 }
532
533 use_delta++;
534 }
535 }
536
537 } while (port->socket.write_ready);
538
539 if (nxt_fd_event_is_disabled(port->socket.write)) {
540 enable_write = 1;
541 }
542
543 goto cleanup;
544
545 fail:
546
547 use_delta++;
548
549 nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
550 &port->socket);
551
552 cleanup:
553
554 if (block_write && nxt_fd_event_is_active(port->socket.write)) {
555 nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
556 }
557
558 if (enable_write) {
559 nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
560 }
561
562 if (use_delta != 0) {
563 nxt_port_use(task, port, use_delta);
564 }
565 }
566
567
568 static nxt_port_send_msg_t *
nxt_port_msg_first(nxt_port_t * port)569 nxt_port_msg_first(nxt_port_t *port)
570 {
571 nxt_queue_link_t *lnk;
572 nxt_port_send_msg_t *msg;
573
574 nxt_thread_mutex_lock(&port->write_mutex);
575
576 lnk = nxt_queue_first(&port->messages);
577
578 if (lnk == nxt_queue_tail(&port->messages)) {
579 msg = NULL;
580
581 } else {
582 msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
583 }
584
585 nxt_thread_mutex_unlock(&port->write_mutex);
586
587 return msg;
588 }
589
590
591 nxt_inline void
nxt_port_msg_close_fd(nxt_port_send_msg_t * msg)592 nxt_port_msg_close_fd(nxt_port_send_msg_t *msg)
593 {
594 if (!msg->close_fd) {
595 return;
596 }
597
598 nxt_port_close_fds(msg->fd);
599 }
600
601
602 nxt_inline void
nxt_port_close_fds(nxt_fd_t * fd)603 nxt_port_close_fds(nxt_fd_t *fd)
604 {
605 if (fd[0] != -1) {
606 nxt_fd_close(fd[0]);
607 fd[0] = -1;
608 }
609
610 if (fd[1] != -1) {
611 nxt_fd_close(fd[1]);
612 fd[1] = -1;
613 }
614 }
615
616
617 static nxt_buf_t *
nxt_port_buf_completion(nxt_task_t * task,nxt_work_queue_t * wq,nxt_buf_t * b,size_t sent,nxt_bool_t mmap_mode)618 nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
619 size_t sent, nxt_bool_t mmap_mode)
620 {
621 size_t size;
622 nxt_buf_t *next;
623
624 while (b != NULL) {
625
626 nxt_prefetch(b->next);
627
628 if (!nxt_buf_is_sync(b)) {
629
630 size = nxt_buf_used_size(b);
631
632 if (size != 0) {
633
634 if (sent == 0) {
635 break;
636 }
637
638 if (nxt_buf_is_port_mmap(b) && mmap_mode) {
639 /*
640 * buffer has been sent to other side which is now
641 * responsible for shared memory bucket release
642 */
643 b->is_port_mmap_sent = 1;
644 }
645
646 if (sent < size) {
647
648 if (nxt_buf_is_mem(b)) {
649 b->mem.pos += sent;
650 }
651
652 if (nxt_buf_is_file(b)) {
653 b->file_pos += sent;
654 }
655
656 break;
657 }
658
659 /* b->mem.free is NULL in file-only buffer. */
660 b->mem.pos = b->mem.free;
661
662 if (nxt_buf_is_file(b)) {
663 b->file_pos = b->file_end;
664 }
665
666 sent -= size;
667 }
668 }
669
670 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
671
672 next = b->next;
673 b->next = NULL;
674 b = next;
675 }
676
677 return b;
678 }
679
680
681 static nxt_port_send_msg_t *
nxt_port_msg_insert_tail(nxt_port_t * port,nxt_port_send_msg_t * msg)682 nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
683 {
684 if (msg->allocated == 0) {
685 msg = nxt_port_msg_alloc(msg);
686
687 if (nxt_slow_path(msg == NULL)) {
688 return NULL;
689 }
690 }
691
692 nxt_thread_mutex_lock(&port->write_mutex);
693
694 nxt_queue_insert_tail(&port->messages, &msg->link);
695
696 nxt_thread_mutex_unlock(&port->write_mutex);
697
698 return msg;
699 }
700
701
702 void
nxt_port_read_enable(nxt_task_t * task,nxt_port_t * port)703 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
704 {
705 port->socket.fd = port->pair[0];
706 port->socket.log = &nxt_main_log;
707
708 port->engine = task->thread->engine;
709
710 port->socket.read_work_queue = &port->engine->fast_work_queue;
711 port->socket.read_handler = port->queue != NULL
712 ? nxt_port_queue_read_handler
713 : nxt_port_read_handler;
714 port->socket.error_handler = nxt_port_error_handler;
715
716 nxt_fd_event_enable_read(port->engine, &port->socket);
717 }
718
719
720 void
nxt_port_read_close(nxt_port_t * port)721 nxt_port_read_close(nxt_port_t *port)
722 {
723 port->socket.read_ready = 0;
724 port->socket.read = NXT_EVENT_INACTIVE;
725 nxt_socket_close(port->socket.task, port->pair[0]);
726 port->pair[0] = -1;
727 }
728
729
730 static void
nxt_port_read_handler(nxt_task_t * task,void * obj,void * data)731 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
732 {
733 ssize_t n;
734 nxt_buf_t *b;
735 nxt_int_t ret;
736 nxt_port_t *port;
737 nxt_recv_oob_t oob;
738 nxt_port_recv_msg_t msg;
739 struct iovec iov[2];
740
741 port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
742
743 nxt_assert(port->engine == task->thread->engine);
744
745 for ( ;; ) {
746 b = nxt_port_buf_alloc(port);
747
748 if (nxt_slow_path(b == NULL)) {
749 /* TODO: disable event for some time */
750 }
751
752 iov[0].iov_base = &msg.port_msg;
753 iov[0].iov_len = sizeof(nxt_port_msg_t);
754
755 iov[1].iov_base = b->mem.pos;
756 iov[1].iov_len = port->max_size;
757
758 n = nxt_socketpair_recv(&port->socket, iov, 2, &oob);
759
760 if (n > 0) {
761 msg.fd[0] = -1;
762 msg.fd[1] = -1;
763
764 ret = nxt_socket_msg_oob_get(&oob, msg.fd,
765 nxt_recv_msg_cmsg_pid_ref(&msg));
766 if (nxt_slow_path(ret != NXT_OK)) {
767 nxt_alert(task, "failed to get oob data from %d",
768 port->socket.fd);
769
770 nxt_port_close_fds(msg.fd);
771
772 goto fail;
773 }
774
775 msg.buf = b;
776 msg.size = n;
777
778 nxt_port_read_msg_process(task, port, &msg);
779
780 /*
781 * To disable instant completion or buffer re-usage,
782 * handler should reset 'msg.buf'.
783 */
784 if (msg.buf == b) {
785 nxt_port_buf_free(port, b);
786 }
787
788 if (port->socket.read_ready) {
789 continue;
790 }
791
792 return;
793 }
794
795 if (n == NXT_AGAIN) {
796 nxt_port_buf_free(port, b);
797
798 nxt_fd_event_enable_read(task->thread->engine, &port->socket);
799 return;
800 }
801
802 fail:
803 /* n == 0 || error */
804 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
805 nxt_port_error_handler, task, &port->socket, NULL);
806 return;
807 }
808 }
809
810
811 static void
nxt_port_queue_read_handler(nxt_task_t * task,void * obj,void * data)812 nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
813 {
814 ssize_t n;
815 nxt_buf_t *b;
816 nxt_int_t ret;
817 nxt_port_t *port;
818 struct iovec iov[2];
819 nxt_recv_oob_t oob;
820 nxt_port_queue_t *queue;
821 nxt_port_recv_msg_t msg, *smsg;
822 uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
823
824 port = nxt_container_of(obj, nxt_port_t, socket);
825 msg.port = port;
826
827 nxt_assert(port->engine == task->thread->engine);
828
829 queue = port->queue;
830 nxt_atomic_fetch_add(&queue->nitems, 1);
831
832 for ( ;; ) {
833
834 if (port->from_socket == 0) {
835 n = nxt_port_queue_recv(queue, qmsg);
836
837 if (n < 0 && !port->socket.read_ready) {
838 nxt_atomic_fetch_add(&queue->nitems, -1);
839
840 n = nxt_port_queue_recv(queue, qmsg);
841 if (n < 0) {
842 return;
843 }
844
845 nxt_atomic_fetch_add(&queue->nitems, 1);
846 }
847
848 if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) {
849 port->from_socket++;
850
851 nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d",
852 (int) port->pid, (int) port->id, port->socket.fd,
853 port->from_socket);
854
855 continue;
856 }
857
858 nxt_debug(task, "port{%d,%d} %d: dequeue %d",
859 (int) port->pid, (int) port->id, port->socket.fd,
860 (int) n);
861
862 } else {
863 if ((smsg = port->socket_msg) != NULL && smsg->size != 0) {
864 msg.port_msg = smsg->port_msg;
865 b = smsg->buf;
866 n = smsg->size;
867 msg.fd[0] = smsg->fd[0];
868 msg.fd[1] = smsg->fd[1];
869
870 smsg->size = 0;
871
872 port->from_socket--;
873
874 nxt_debug(task, "port{%d,%d} %d: use suspended message %d",
875 (int) port->pid, (int) port->id, port->socket.fd,
876 (int) n);
877
878 goto process;
879 }
880
881 n = -1;
882 }
883
884 if (n < 0 && !port->socket.read_ready) {
885 nxt_atomic_fetch_add(&queue->nitems, -1);
886 return;
887 }
888
889 b = nxt_port_buf_alloc(port);
890
891 if (nxt_slow_path(b == NULL)) {
892 /* TODO: disable event for some time */
893 }
894
895 if (n >= (ssize_t) sizeof(nxt_port_msg_t)) {
896 nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t));
897
898 if (n > (ssize_t) sizeof(nxt_port_msg_t)) {
899 nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t),
900 n - sizeof(nxt_port_msg_t));
901 }
902
903 } else {
904 iov[0].iov_base = &msg.port_msg;
905 iov[0].iov_len = sizeof(nxt_port_msg_t);
906
907 iov[1].iov_base = b->mem.pos;
908 iov[1].iov_len = port->max_size;
909
910 n = nxt_socketpair_recv(&port->socket, iov, 2, &oob);
911
912 if (n > 0) {
913 msg.fd[0] = -1;
914 msg.fd[1] = -1;
915
916 ret = nxt_socket_msg_oob_get(&oob, msg.fd,
917 nxt_recv_msg_cmsg_pid_ref(&msg));
918 if (nxt_slow_path(ret != NXT_OK)) {
919 nxt_alert(task, "failed to get oob data from %d",
920 port->socket.fd);
921
922 nxt_port_close_fds(msg.fd);
923
924 return;
925 }
926 }
927
928 if (n == (ssize_t) sizeof(nxt_port_msg_t)
929 && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
930 {
931 nxt_port_buf_free(port, b);
932
933 nxt_debug(task, "port{%d,%d} %d: recv %d read_queue",
934 (int) port->pid, (int) port->id, port->socket.fd,
935 (int) n);
936
937 continue;
938 }
939
940 nxt_debug(task, "port{%d,%d} %d: recvmsg %d",
941 (int) port->pid, (int) port->id, port->socket.fd,
942 (int) n);
943
944 if (n > 0) {
945 if (port->from_socket == 0) {
946 nxt_debug(task, "port{%d,%d} %d: suspend message %d",
947 (int) port->pid, (int) port->id, port->socket.fd,
948 (int) n);
949
950 smsg = port->socket_msg;
951
952 if (nxt_slow_path(smsg == NULL)) {
953 smsg = nxt_mp_alloc(port->mem_pool,
954 sizeof(nxt_port_recv_msg_t));
955
956 if (nxt_slow_path(smsg == NULL)) {
957 nxt_alert(task, "port{%d,%d} %d: suspend message "
958 "failed",
959 (int) port->pid, (int) port->id,
960 port->socket.fd);
961
962 return;
963 }
964
965 port->socket_msg = smsg;
966
967 } else {
968 if (nxt_slow_path(smsg->size != 0)) {
969 nxt_alert(task, "port{%d,%d} %d: too many suspend "
970 "messages",
971 (int) port->pid, (int) port->id,
972 port->socket.fd);
973
974 return;
975 }
976 }
977
978 smsg->port_msg = msg.port_msg;
979 smsg->buf = b;
980 smsg->size = n;
981 smsg->fd[0] = msg.fd[0];
982 smsg->fd[1] = msg.fd[1];
983
984 continue;
985 }
986
987 port->from_socket--;
988 }
989 }
990
991 process:
992
993 if (n > 0) {
994 msg.buf = b;
995 msg.size = n;
996
997 nxt_port_read_msg_process(task, port, &msg);
998
999 /*
1000 * To disable instant completion or buffer re-usage,
1001 * handler should reset 'msg.buf'.
1002 */
1003 if (msg.buf == b) {
1004 nxt_port_buf_free(port, b);
1005 }
1006
1007 continue;
1008 }
1009
1010 if (n == NXT_AGAIN) {
1011 nxt_port_buf_free(port, b);
1012
1013 nxt_fd_event_enable_read(task->thread->engine, &port->socket);
1014
1015 continue;
1016 }
1017
1018 /* n == 0 || n == NXT_ERROR */
1019
1020 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1021 nxt_port_error_handler, task, &port->socket, NULL);
1022 return;
1023 }
1024 }
1025
1026
1027 typedef struct {
1028 uint32_t stream;
1029 uint32_t pid;
1030 } nxt_port_frag_key_t;
1031
1032
1033 static nxt_int_t
nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t * lhq,void * data)1034 nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
1035 {
1036 nxt_port_recv_msg_t *fmsg;
1037 nxt_port_frag_key_t *frag_key;
1038
1039 fmsg = data;
1040 frag_key = (nxt_port_frag_key_t *) lhq->key.start;
1041
1042 if (lhq->key.length == sizeof(nxt_port_frag_key_t)
1043 && frag_key->stream == fmsg->port_msg.stream
1044 && frag_key->pid == (uint32_t) fmsg->port_msg.pid)
1045 {
1046 return NXT_OK;
1047 }
1048
1049 return NXT_DECLINED;
1050 }
1051
1052
1053 static void *
nxt_port_lvlhsh_frag_alloc(void * ctx,size_t size)1054 nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
1055 {
1056 return nxt_mp_align(ctx, size, size);
1057 }
1058
1059
1060 static void
nxt_port_lvlhsh_frag_free(void * ctx,void * p)1061 nxt_port_lvlhsh_frag_free(void *ctx, void *p)
1062 {
1063 nxt_mp_free(ctx, p);
1064 }
1065
1066
1067 static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = {
1068 NXT_LVLHSH_DEFAULT,
1069 nxt_port_lvlhsh_frag_test,
1070 nxt_port_lvlhsh_frag_alloc,
1071 nxt_port_lvlhsh_frag_free,
1072 };
1073
1074
1075 static nxt_port_recv_msg_t *
nxt_port_frag_start(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)1076 nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
1077 nxt_port_recv_msg_t *msg)
1078 {
1079 nxt_int_t res;
1080 nxt_lvlhsh_query_t lhq;
1081 nxt_port_recv_msg_t *fmsg;
1082 nxt_port_frag_key_t frag_key;
1083
1084 nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
1085
1086 fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
1087
1088 if (nxt_slow_path(fmsg == NULL)) {
1089 return NULL;
1090 }
1091
1092 *fmsg = *msg;
1093
1094 frag_key.stream = fmsg->port_msg.stream;
1095 frag_key.pid = fmsg->port_msg.pid;
1096
1097 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
1098 lhq.key.length = sizeof(nxt_port_frag_key_t);
1099 lhq.key.start = (u_char *) &frag_key;
1100 lhq.proto = &lvlhsh_frag_proto;
1101 lhq.replace = 0;
1102 lhq.value = fmsg;
1103 lhq.pool = port->mem_pool;
1104
1105 res = nxt_lvlhsh_insert(&port->frags, &lhq);
1106
1107 switch (res) {
1108
1109 case NXT_OK:
1110 return fmsg;
1111
1112 case NXT_DECLINED:
1113 nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
1114 fmsg->port_msg.stream);
1115 nxt_mp_free(port->mem_pool, fmsg);
1116
1117 return NULL;
1118
1119 default:
1120 nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
1121 fmsg->port_msg.stream);
1122
1123 nxt_mp_free(port->mem_pool, fmsg);
1124
1125 return NULL;
1126
1127 }
1128 }
1129
1130
1131 static nxt_port_recv_msg_t *
nxt_port_frag_find(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)1132 nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
1133 {
1134 nxt_int_t res;
1135 nxt_bool_t last;
1136 nxt_lvlhsh_query_t lhq;
1137 nxt_port_frag_key_t frag_key;
1138
1139 last = msg->port_msg.mf == 0;
1140
1141 nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
1142 msg->port_msg.stream);
1143
1144 frag_key.stream = msg->port_msg.stream;
1145 frag_key.pid = msg->port_msg.pid;
1146
1147 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
1148 lhq.key.length = sizeof(nxt_port_frag_key_t);
1149 lhq.key.start = (u_char *) &frag_key;
1150 lhq.proto = &lvlhsh_frag_proto;
1151 lhq.pool = port->mem_pool;
1152
1153 res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
1154 nxt_lvlhsh_find(&port->frags, &lhq);
1155
1156 switch (res) {
1157
1158 case NXT_OK:
1159 return lhq.value;
1160
1161 default:
1162 nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
1163 frag_key.stream);
1164
1165 return NULL;
1166 }
1167 }
1168
1169
1170 static void
nxt_port_read_msg_process(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)1171 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
1172 nxt_port_recv_msg_t *msg)
1173 {
1174 nxt_buf_t *b, *orig_b, *next;
1175 nxt_port_recv_msg_t *fmsg;
1176
1177 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
1178 nxt_alert(task, "port %d: too small message:%uz",
1179 port->socket.fd, msg->size);
1180
1181 nxt_port_close_fds(msg->fd);
1182
1183 return;
1184 }
1185
1186 /* adjust size to actual buffer used size */
1187 msg->size -= sizeof(nxt_port_msg_t);
1188
1189 b = orig_b = msg->buf;
1190 b->mem.free += msg->size;
1191
1192 msg->cancelled = 0;
1193
1194 if (nxt_slow_path(msg->port_msg.nf != 0)) {
1195
1196 fmsg = nxt_port_frag_find(task, port, msg);
1197
1198 if (nxt_slow_path(fmsg == NULL)) {
1199 goto fmsg_failed;
1200 }
1201
1202 if (nxt_fast_path(fmsg->cancelled == 0)) {
1203
1204 if (msg->port_msg.mmap) {
1205 nxt_port_mmap_read(task, msg);
1206 }
1207
1208 nxt_buf_chain_add(&fmsg->buf, msg->buf);
1209
1210 fmsg->size += msg->size;
1211 msg->buf = NULL;
1212 b = NULL;
1213
1214 if (nxt_fast_path(msg->port_msg.mf == 0)) {
1215
1216 b = fmsg->buf;
1217
1218 port->handler(task, fmsg);
1219
1220 msg->buf = fmsg->buf;
1221 msg->fd[0] = fmsg->fd[0];
1222 msg->fd[1] = fmsg->fd[1];
1223
1224 /*
1225 * To disable instant completion or buffer re-usage,
1226 * handler should reset 'msg.buf'.
1227 */
1228 if (!msg->port_msg.mmap && msg->buf == b) {
1229 nxt_port_buf_free(port, b);
1230 }
1231 }
1232 }
1233
1234 if (nxt_fast_path(msg->port_msg.mf == 0)) {
1235 nxt_mp_free(port->mem_pool, fmsg);
1236 }
1237 } else {
1238 if (nxt_slow_path(msg->port_msg.mf != 0)) {
1239
1240 if (msg->port_msg.mmap && msg->cancelled == 0) {
1241 nxt_port_mmap_read(task, msg);
1242 b = msg->buf;
1243 }
1244
1245 fmsg = nxt_port_frag_start(task, port, msg);
1246
1247 if (nxt_slow_path(fmsg == NULL)) {
1248 goto fmsg_failed;
1249 }
1250
1251 fmsg->port_msg.nf = 0;
1252 fmsg->port_msg.mf = 0;
1253
1254 if (nxt_fast_path(msg->cancelled == 0)) {
1255 msg->buf = NULL;
1256 msg->fd[0] = -1;
1257 msg->fd[1] = -1;
1258 b = NULL;
1259
1260 } else {
1261 nxt_port_close_fds(msg->fd);
1262 }
1263 } else {
1264 if (nxt_fast_path(msg->cancelled == 0)) {
1265
1266 if (msg->port_msg.mmap) {
1267 nxt_port_mmap_read(task, msg);
1268 b = msg->buf;
1269 }
1270
1271 port->handler(task, msg);
1272 }
1273 }
1274 }
1275
1276 fmsg_failed:
1277
1278 if (msg->port_msg.mmap && orig_b != b) {
1279
1280 /*
1281 * To disable instant buffer completion,
1282 * handler should reset 'msg->buf'.
1283 */
1284 if (msg->buf == b) {
1285 /* complete mmap buffers */
1286 while (b != NULL) {
1287 nxt_debug(task, "complete buffer %p", b);
1288
1289 nxt_work_queue_add(port->socket.read_work_queue,
1290 b->completion_handler, task, b, b->parent);
1291
1292 next = b->next;
1293 b->next = NULL;
1294 b = next;
1295 }
1296 }
1297
1298 /* restore original buf */
1299 msg->buf = orig_b;
1300 }
1301 }
1302
1303
1304 static nxt_buf_t *
nxt_port_buf_alloc(nxt_port_t * port)1305 nxt_port_buf_alloc(nxt_port_t *port)
1306 {
1307 nxt_buf_t *b;
1308
1309 if (port->free_bufs != NULL) {
1310 b = port->free_bufs;
1311 port->free_bufs = b->next;
1312
1313 b->mem.pos = b->mem.start;
1314 b->mem.free = b->mem.start;
1315 b->next = NULL;
1316 } else {
1317 b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
1318 if (nxt_slow_path(b == NULL)) {
1319 return NULL;
1320 }
1321 }
1322
1323 return b;
1324 }
1325
1326
1327 static void
nxt_port_buf_free(nxt_port_t * port,nxt_buf_t * b)1328 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
1329 {
1330 nxt_buf_chain_add(&b, port->free_bufs);
1331 port->free_bufs = b;
1332 }
1333
1334
1335 static void
nxt_port_error_handler(nxt_task_t * task,void * obj,void * data)1336 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
1337 {
1338 int use_delta;
1339 nxt_buf_t *b, *next;
1340 nxt_port_t *port;
1341 nxt_work_queue_t *wq;
1342 nxt_port_send_msg_t *msg;
1343
1344 nxt_debug(task, "port error handler %p", obj);
1345 /* TODO */
1346
1347 port = nxt_container_of(obj, nxt_port_t, socket);
1348
1349 use_delta = 0;
1350
1351 if (obj == data) {
1352 use_delta--;
1353 }
1354
1355 wq = &task->thread->engine->fast_work_queue;
1356
1357 nxt_thread_mutex_lock(&port->write_mutex);
1358
1359 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
1360
1361 nxt_port_msg_close_fd(msg);
1362
1363 for (b = msg->buf; b != NULL; b = next) {
1364 next = b->next;
1365 b->next = NULL;
1366
1367 if (nxt_buf_is_sync(b)) {
1368 continue;
1369 }
1370
1371 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
1372 }
1373
1374 nxt_queue_remove(&msg->link);
1375 use_delta--;
1376
1377 nxt_port_release_send_msg(msg);
1378
1379 } nxt_queue_loop;
1380
1381 nxt_thread_mutex_unlock(&port->write_mutex);
1382
1383 if (use_delta != 0) {
1384 nxt_port_use(task, port, use_delta);
1385 }
1386 }
1387