1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); 11 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); 12 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 13 nxt_port_recv_msg_t *msg); 14 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); 15 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); 16 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); 17 18 19 nxt_int_t 20 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) 21 { 22 nxt_int_t sndbuf, rcvbuf, size; 23 nxt_socket_t snd, rcv; 24 25 port->socket.task = task; 26 27 port->pair[0] = -1; 28 port->pair[1] = -1; 29 30 if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { 31 goto socketpair_fail; 32 } 33 34 snd = port->pair[1]; 35 36 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 37 if (nxt_slow_path(sndbuf < 0)) { 38 goto getsockopt_fail; 39 } 40 41 rcv = port->pair[0]; 42 43 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 44 if (nxt_slow_path(rcvbuf < 0)) { 45 goto getsockopt_fail; 46 } 47 48 if (max_size == 0) { 49 max_size = 16 * 1024; 50 } 51 52 if ((size_t) sndbuf < max_size) { 53 /* 54 * On Unix domain sockets 55 * Linux uses 224K on both send and receive directions; 56 * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size 57 * on send direction and 4K buffer size on receive direction; 58 * Solaris uses 16K on send direction and 5K on receive direction. 59 */ 60 (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, 61 max_size); 62 63 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 64 if (nxt_slow_path(sndbuf < 0)) { 65 goto getsockopt_fail; 66 } 67 68 size = sndbuf * 4; 69 70 if (rcvbuf < size) { 71 (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, 72 size); 73 74 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 75 if (nxt_slow_path(rcvbuf < 0)) { 76 goto getsockopt_fail; 77 } 78 } 79 } 80 81 port->max_size = nxt_min(max_size, (size_t) sndbuf); 82 port->max_share = (64 * 1024); 83 84 return NXT_OK; 85 86 getsockopt_fail: 87 88 nxt_socket_close(task, port->pair[0]); 89 nxt_socket_close(task, port->pair[1]); 90 91 socketpair_fail: 92 93 return NXT_ERROR; 94 } 95 96 97 void 98 nxt_port_destroy(nxt_port_t *port) 99 { 100 nxt_socket_close(port->socket.task, port->socket.fd); 101 nxt_mp_destroy(port->mem_pool); 102 } 103 104 105 void 106 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) 107 { 108 port->socket.fd = port->pair[1]; 109 port->socket.log = &nxt_main_log; 110 port->socket.write_ready = 1; 111 112 port->engine = task->thread->engine; 113 114 port->socket.write_work_queue = &port->engine->fast_work_queue; 115 port->socket.write_handler = nxt_port_write_handler; 116 port->socket.error_handler = nxt_port_error_handler; 117 118 if (port->iov == NULL) { 119 port->iov = nxt_mp_get(port->mem_pool, sizeof(struct iovec) * 120 NXT_IOBUF_MAX * 10); 121 port->mmsg_buf = nxt_mp_get(port->mem_pool, sizeof(uint32_t) * 3 * 122 NXT_IOBUF_MAX * 10); 123 } 124 } 125 126 127 void 128 nxt_port_write_close(nxt_port_t *port) 129 { 130 nxt_socket_close(port->socket.task, port->pair[1]); 131 port->pair[1] = -1; 132 } 133 134 135 static void 136 nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data) 137 { 138 nxt_event_engine_t *engine; 139 nxt_port_send_msg_t *msg; 140 141 msg = obj; 142 engine = data; 143 144 #if (NXT_DEBUG) 145 if (nxt_slow_path(data != msg->work.data)) { 146 nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)", 147 data, msg->work.data); 148 nxt_abort(); 149 } 150 #endif 151 152 if (engine != task->thread->engine) { 153 154 nxt_debug(task, "current thread is %PT, expected %PT", 155 task->thread->tid, engine->task.thread->tid); 156 157 nxt_event_engine_post(engine, &msg->work); 158 159 return; 160 } 161 162 nxt_mp_free(engine->mem_pool, obj); 163 nxt_mp_release(engine->mem_pool); 164 } 165 166 167 static nxt_port_send_msg_t * 168 nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m) 169 { 170 nxt_mp_t *mp; 171 nxt_port_send_msg_t *msg; 172 173 mp = task->thread->engine->mem_pool; 174 175 msg = nxt_mp_alloc(mp, sizeof(nxt_port_send_msg_t)); 176 if (nxt_slow_path(msg == NULL)) { 177 return NULL; 178 } 179 180 nxt_mp_retain(mp); 181 182 msg->link.next = NULL; 183 msg->link.prev = NULL; 184 185 msg->buf = m->buf; 186 msg->fd = m->fd; 187 msg->close_fd = m->close_fd; 188 msg->port_msg = m->port_msg; 189 190 msg->work.next = NULL; 191 msg->work.handler = nxt_port_release_send_msg; 192 msg->work.task = task; 193 msg->work.obj = msg; 194 msg->work.data = task->thread->engine; 195 196 return msg; 197 } 198 199 200 static nxt_port_send_msg_t * 201 nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) 202 { 203 if (msg->work.data == NULL) { 204 msg = nxt_port_msg_create(task, msg); 205 } 206 207 if (msg != NULL) { 208 nxt_queue_insert_tail(&port->messages, &msg->link); 209 } 210 211 return msg; 212 } 213 214 215 static nxt_port_send_msg_t * 216 nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) 217 { 218 nxt_queue_link_t *lnk; 219 220 lnk = nxt_queue_first(&port->messages); 221 222 if (lnk == nxt_queue_tail(&port->messages)) { 223 return msg; 224 } 225 226 return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); 227 } 228 229 230 nxt_int_t 231 nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 232 nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, 233 void *tracking) 234 { 235 nxt_port_send_msg_t msg, *res; 236 237 msg.link.next = NULL; 238 msg.link.prev = NULL; 239 240 msg.buf = b; 241 msg.fd = fd; 242 msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; 243 msg.share = 0; 244 245 if (tracking != NULL) { 246 nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); 247 } 248 249 msg.port_msg.stream = stream; 250 msg.port_msg.pid = nxt_pid; 251 msg.port_msg.reply_port = reply_port; 252 msg.port_msg.type = type & NXT_PORT_MSG_MASK; 253 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; 254 msg.port_msg.mmap = 0; 255 msg.port_msg.nf = 0; 256 msg.port_msg.mf = 0; 257 msg.port_msg.tracking = tracking != NULL; 258 259 msg.work.data = NULL; 260 261 if (port->socket.write_ready) { 262 nxt_port_write_handler(task, &port->socket, &msg); 263 } else { 264 nxt_thread_mutex_lock(&port->write_mutex); 265 266 res = nxt_port_msg_push(task, port, &msg); 267 268 nxt_thread_mutex_unlock(&port->write_mutex); 269 270 if (res == NULL) { 271 return NXT_ERROR; 272 } 273 274 nxt_port_use(task, port, 1); 275 } 276 277 return NXT_OK; 278 } 279 280 281 static void 282 nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data) 283 { 284 nxt_fd_event_block_write(task->thread->engine, &port->socket); 285 } 286 287 288 static void 289 nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data) 290 { 291 nxt_fd_event_enable_write(task->thread->engine, &port->socket); 292 } 293 294 295 static void 296 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 297 { 298 int use_delta; 299 size_t plain_size; 300 ssize_t n; 301 nxt_bool_t block_write, enable_write; 302 nxt_port_t *port; 303 struct iovec *iov; 304 nxt_work_queue_t *wq; 305 nxt_port_method_t m; 306 nxt_port_send_msg_t *msg; 307 nxt_sendbuf_coalesce_t sb; 308 309 port = nxt_container_of(obj, nxt_port_t, socket); 310 311 block_write = 0; 312 enable_write = 0; 313 use_delta = 0; 314 315 nxt_thread_mutex_lock(&port->write_mutex); 316 317 iov = port->iov; 318 319 wq = &task->thread->engine->fast_work_queue; 320 321 do { 322 msg = nxt_port_msg_first(task, port, data); 323 324 if (msg == NULL) { 325 block_write = 1; 326 goto unlock_mutex; 327 } 328 329 iov[0].iov_base = &msg->port_msg; 330 iov[0].iov_len = sizeof(nxt_port_msg_t); 331 332 sb.buf = msg->buf; 333 sb.iobuf = &iov[1]; 334 sb.nmax = NXT_IOBUF_MAX - 1; 335 sb.sync = 0; 336 sb.last = 0; 337 sb.size = 0; 338 sb.limit = port->max_size; 339 340 sb.limit_reached = 0; 341 sb.nmax_reached = 0; 342 343 m = nxt_port_mmap_get_method(task, port, msg->buf); 344 345 if (m == NXT_PORT_METHOD_MMAP) { 346 sb.limit = (1ULL << 31) - 1; 347 sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, 348 port->max_size / PORT_MMAP_MIN_SIZE); 349 } 350 351 if (msg->port_msg.tracking) { 352 iov[0].iov_len += sizeof(msg->tracking_msg); 353 } 354 355 nxt_sendbuf_mem_coalesce(task, &sb); 356 357 plain_size = sb.size; 358 359 /* 360 * Send through mmap enabled only when payload 361 * is bigger than PORT_MMAP_MIN_SIZE. 362 */ 363 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 364 nxt_port_mmap_write(task, port, msg, &sb); 365 366 } else { 367 m = NXT_PORT_METHOD_PLAIN; 368 } 369 370 msg->port_msg.last |= sb.last; 371 msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; 372 373 n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); 374 375 if (n > 0) { 376 if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { 377 nxt_log(task, NXT_LOG_CRIT, 378 "port %d: short write: %z instead of %uz", 379 port->socket.fd, n, sb.size + iov[0].iov_len); 380 goto fail; 381 } 382 383 if (msg->fd != -1 && msg->close_fd != 0) { 384 nxt_fd_close(msg->fd); 385 386 msg->fd = -1; 387 } 388 389 msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size, 390 m == NXT_PORT_METHOD_MMAP); 391 392 if (msg->buf != NULL) { 393 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, 394 msg->port_msg.stream); 395 396 /* 397 * A file descriptor is sent only 398 * in the first message of a stream. 399 */ 400 msg->fd = -1; 401 msg->share += n; 402 msg->port_msg.nf = 1; 403 msg->port_msg.tracking = 0; 404 405 if (msg->share >= port->max_share) { 406 msg->share = 0; 407 408 if (msg->link.next != NULL) { 409 nxt_queue_remove(&msg->link); 410 use_delta--; 411 } 412 data = NULL; 413 414 if (nxt_port_msg_push(task, port, msg) != NULL) { 415 use_delta++; 416 } 417 } 418 419 } else { 420 if (msg->link.next != NULL) { 421 nxt_queue_remove(&msg->link); 422 use_delta--; 423 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, 424 msg->work.data); 425 } 426 data = NULL; 427 } 428 429 } else if (nxt_slow_path(n == NXT_ERROR)) { 430 if (msg->link.next == NULL) { 431 if (nxt_port_msg_push(task, port, msg) != NULL) { 432 use_delta++; 433 } 434 } 435 goto fail; 436 } 437 438 /* n == NXT_AGAIN */ 439 440 } while (port->socket.write_ready); 441 442 if (nxt_fd_event_is_disabled(port->socket.write)) { 443 enable_write = 1; 444 } 445 446 goto unlock_mutex; 447 448 fail: 449 450 use_delta++; 451 452 nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, 453 &port->socket); 454 455 unlock_mutex: 456 nxt_thread_mutex_unlock(&port->write_mutex); 457 458 if (block_write && nxt_fd_event_is_active(port->socket.write)) { 459 nxt_port_post(task, port, nxt_port_fd_block_write, NULL); 460 } 461 462 if (enable_write) { 463 nxt_port_post(task, port, nxt_port_fd_enable_write, NULL); 464 } 465 466 if (use_delta != 0) { 467 nxt_port_use(task, port, use_delta); 468 } 469 } 470 471 472 void 473 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) 474 { 475 port->socket.fd = port->pair[0]; 476 port->socket.log = &nxt_main_log; 477 478 port->engine = task->thread->engine; 479 480 port->socket.read_work_queue = &port->engine->fast_work_queue; 481 port->socket.read_handler = nxt_port_read_handler; 482 port->socket.error_handler = nxt_port_error_handler; 483 484 nxt_fd_event_enable_read(port->engine, &port->socket); 485 } 486 487 488 void 489 nxt_port_read_close(nxt_port_t *port) 490 { 491 port->socket.read_ready = 0; 492 nxt_socket_close(port->socket.task, port->pair[0]); 493 port->pair[0] = -1; 494 } 495 496 497 static void 498 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) 499 { 500 ssize_t n; 501 nxt_buf_t *b; 502 nxt_port_t *port; 503 struct iovec iov[2]; 504 nxt_port_recv_msg_t msg; 505 506 port = msg.port = nxt_container_of(obj, nxt_port_t, socket); 507 508 nxt_assert(port->engine == task->thread->engine); 509 510 for ( ;; ) { 511 512 b = nxt_port_buf_alloc(port); 513 514 if (nxt_slow_path(b == NULL)) { 515 /* TODO: disable event for some time */ 516 } 517 518 iov[0].iov_base = &msg.port_msg; 519 iov[0].iov_len = sizeof(nxt_port_msg_t); 520 521 iov[1].iov_base = b->mem.pos; 522 iov[1].iov_len = port->max_size; 523 524 n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); 525 526 if (n > 0) { 527 528 msg.buf = b; 529 msg.size = n; 530 531 nxt_port_read_msg_process(task, port, &msg); 532 533 /* 534 * To disable instant completion or buffer re-usage, 535 * handler should reset 'msg.buf'. 536 */ 537 if (msg.buf == b) { 538 nxt_port_buf_free(port, b); 539 } 540 541 if (port->socket.read_ready) { 542 continue; 543 } 544 545 return; 546 } 547 548 if (n == NXT_AGAIN) { 549 nxt_port_buf_free(port, b); 550 551 nxt_fd_event_enable_read(task->thread->engine, &port->socket); 552 return; 553 } 554 555 /* n == 0 || n == NXT_ERROR */ 556 557 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 558 nxt_port_error_handler, task, &port->socket, NULL); 559 return; 560 } 561 } 562 563 564 static nxt_int_t 565 nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) 566 { 567 nxt_port_recv_msg_t *fmsg; 568 569 fmsg = data; 570 571 if (lhq->key.length == sizeof(uint32_t) 572 && *(uint32_t *) lhq->key.start == fmsg->port_msg.stream) 573 { 574 return NXT_OK; 575 } 576 577 return NXT_DECLINED; 578 } 579 580 581 static void * 582 nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) 583 { 584 return nxt_mp_alloc(ctx, size); 585 } 586 587 588 static void 589 nxt_port_lvlhsh_frag_free(void *ctx, void *p) 590 { 591 nxt_mp_free(ctx, p); 592 } 593 594 595 static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = { 596 NXT_LVLHSH_DEFAULT, 597 nxt_port_lvlhsh_frag_test, 598 nxt_port_lvlhsh_frag_alloc, 599 nxt_port_lvlhsh_frag_free, 600 }; 601 602 603 static nxt_port_recv_msg_t * 604 nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, 605 nxt_port_recv_msg_t *msg) 606 { 607 nxt_int_t res; 608 nxt_lvlhsh_query_t lhq; 609 nxt_port_recv_msg_t *fmsg; 610 611 nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); 612 613 fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); 614 615 if (nxt_slow_path(fmsg == NULL)) { 616 return NULL; 617 } 618 619 *fmsg = *msg; 620 621 lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t)); 622 lhq.key.length = sizeof(uint32_t); 623 lhq.key.start = (u_char *) &fmsg->port_msg.stream; 624 lhq.proto = &lvlhsh_frag_proto; 625 lhq.replace = 0; 626 lhq.value = fmsg; 627 lhq.pool = port->mem_pool; 628 629 res = nxt_lvlhsh_insert(&port->frags, &lhq); 630 631 switch (res) { 632 633 case NXT_OK: 634 return fmsg; 635 636 case NXT_DECLINED: 637 nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD", 638 fmsg->port_msg.stream); 639 nxt_mp_free(port->mem_pool, fmsg); 640 641 return NULL; 642 643 default: 644 nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD", 645 fmsg->port_msg.stream); 646 647 nxt_mp_free(port->mem_pool, fmsg); 648 649 return NULL; 650 651 } 652 } 653 654 655 static nxt_port_recv_msg_t * 656 nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream, 657 nxt_bool_t last) 658 { 659 nxt_int_t res; 660 nxt_lvlhsh_query_t lhq; 661 662 nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream); 663 664 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t)); 665 lhq.key.length = sizeof(uint32_t); 666 lhq.key.start = (u_char *) &stream; 667 lhq.proto = &lvlhsh_frag_proto; 668 lhq.pool = port->mem_pool; 669 670 res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) : 671 nxt_lvlhsh_find(&port->frags, &lhq); 672 673 switch (res) { 674 675 case NXT_OK: 676 return lhq.value; 677 678 default: 679 nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", stream); 680 681 return NULL; 682 } 683 } 684 685 686 static void 687 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 688 nxt_port_recv_msg_t *msg) 689 { 690 nxt_buf_t *b, *orig_b; 691 nxt_port_recv_msg_t *fmsg; 692 693 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 694 nxt_log(task, NXT_LOG_CRIT, 695 "port %d: too small message:%uz", port->socket.fd, msg->size); 696 697 if (msg->fd != -1) { 698 nxt_fd_close(msg->fd); 699 } 700 701 return; 702 } 703 704 /* adjust size to actual buffer used size */ 705 msg->size -= sizeof(nxt_port_msg_t); 706 707 b = orig_b = msg->buf; 708 b->mem.free += msg->size; 709 710 if (msg->port_msg.tracking) { 711 msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0; 712 713 } else { 714 msg->cancelled = 0; 715 } 716 717 if (nxt_slow_path(msg->port_msg.nf != 0)) { 718 719 fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream, 720 msg->port_msg.mf == 0); 721 722 if (nxt_slow_path(fmsg == NULL)) { 723 goto fmsg_failed; 724 } 725 726 if (nxt_fast_path(fmsg->cancelled == 0)) { 727 728 if (msg->port_msg.mmap) { 729 nxt_port_mmap_read(task, msg); 730 } 731 732 nxt_buf_chain_add(&fmsg->buf, msg->buf); 733 734 fmsg->size += msg->size; 735 msg->buf = NULL; 736 b = NULL; 737 738 if (nxt_fast_path(msg->port_msg.mf == 0)) { 739 740 b = fmsg->buf; 741 742 port->handler(task, fmsg); 743 744 msg->buf = fmsg->buf; 745 msg->fd = fmsg->fd; 746 } 747 } 748 749 if (nxt_fast_path(msg->port_msg.mf == 0)) { 750 nxt_mp_free(port->mem_pool, fmsg); 751 } 752 } else { 753 if (nxt_slow_path(msg->port_msg.mf != 0)) { 754 755 if (msg->port_msg.mmap && msg->cancelled == 0) { 756 nxt_port_mmap_read(task, msg); 757 b = msg->buf; 758 } 759 760 fmsg = nxt_port_frag_start(task, port, msg); 761 762 if (nxt_slow_path(fmsg == NULL)) { 763 goto fmsg_failed; 764 } 765 766 fmsg->port_msg.nf = 0; 767 fmsg->port_msg.mf = 0; 768 769 if (nxt_fast_path(msg->cancelled == 0)) { 770 msg->buf = NULL; 771 msg->fd = -1; 772 b = NULL; 773 774 } else { 775 if (msg->fd != -1) { 776 nxt_fd_close(msg->fd); 777 } 778 } 779 } else { 780 if (nxt_fast_path(msg->cancelled == 0)) { 781 782 if (msg->port_msg.mmap) { 783 nxt_port_mmap_read(task, msg); 784 b = msg->buf; 785 } 786 787 port->handler(task, msg); 788 } 789 } 790 } 791 792 fmsg_failed: 793 794 if (msg->port_msg.mmap && orig_b != b) { 795 796 /* 797 * To disable instant buffer completion, 798 * handler should reset 'msg->buf'. 799 */ 800 if (msg->buf == b) { 801 /* complete mmap buffers */ 802 for (; b != NULL; b = b->next) { 803 nxt_debug(task, "complete buffer %p", b); 804 805 nxt_work_queue_add(port->socket.read_work_queue, 806 b->completion_handler, task, b, b->parent); 807 } 808 } 809 810 /* restore original buf */ 811 msg->buf = orig_b; 812 } 813 } 814 815 816 static nxt_buf_t * 817 nxt_port_buf_alloc(nxt_port_t *port) 818 { 819 nxt_buf_t *b; 820 821 if (port->free_bufs != NULL) { 822 b = port->free_bufs; 823 port->free_bufs = b->next; 824 825 b->mem.pos = b->mem.start; 826 b->mem.free = b->mem.start; 827 b->next = NULL; 828 } else { 829 b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); 830 if (nxt_slow_path(b == NULL)) { 831 return NULL; 832 } 833 } 834 835 return b; 836 } 837 838 839 static void 840 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) 841 { 842 b->next = port->free_bufs; 843 port->free_bufs = b; 844 } 845 846 847 static void 848 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 849 { 850 int use_delta; 851 nxt_buf_t *b; 852 nxt_port_t *port; 853 nxt_work_queue_t *wq; 854 nxt_port_send_msg_t *msg; 855 856 nxt_debug(task, "port error handler %p", obj); 857 /* TODO */ 858 859 port = nxt_container_of(obj, nxt_port_t, socket); 860 861 use_delta = 0; 862 863 if (obj == data) { 864 use_delta--; 865 } 866 867 wq = &task->thread->engine->fast_work_queue; 868 869 nxt_thread_mutex_lock(&port->write_mutex); 870 871 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 872 873 for (b = msg->buf; b != NULL; b = b->next) { 874 if (nxt_buf_is_sync(b)) { 875 continue; 876 } 877 878 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 879 } 880 881 nxt_queue_remove(&msg->link); 882 use_delta--; 883 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, 884 msg->work.data); 885 886 } nxt_queue_loop; 887 888 nxt_thread_mutex_unlock(&port->write_mutex); 889 890 if (use_delta != 0) { 891 nxt_port_use(task, port, use_delta); 892 } 893 } 894