Deleted
Added
nxt_unit.c (1437:1990c369a0b9) | nxt_unit.c (1438:5e5a3897e0cd) |
---|---|
1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include <stdlib.h> 7 8#include "nxt_main.h" --- 2621 unchanged lines hidden (view full) --- 2630 2631 return read; 2632} 2633 2634 2635void 2636nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 2637{ | 1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include <stdlib.h> 7 8#include "nxt_main.h" --- 2621 unchanged lines hidden (view full) --- 2630 2631 return read; 2632} 2633 2634 2635void 2636nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 2637{ |
2638 ssize_t res; | |
2639 uint32_t size; 2640 nxt_port_msg_t msg; 2641 nxt_unit_impl_t *lib; 2642 nxt_unit_request_info_impl_t *req_impl; 2643 2644 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2645 2646 nxt_unit_req_debug(req, "done: %d", rc); --- 38 unchanged lines hidden (view full) --- 2685 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 2686 : _NXT_PORT_MSG_RPC_ERROR; 2687 msg.last = 1; 2688 msg.mmap = 0; 2689 msg.nf = 0; 2690 msg.mf = 0; 2691 msg.tracking = 0; 2692 | 2638 uint32_t size; 2639 nxt_port_msg_t msg; 2640 nxt_unit_impl_t *lib; 2641 nxt_unit_request_info_impl_t *req_impl; 2642 2643 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2644 2645 nxt_unit_req_debug(req, "done: %d", rc); --- 38 unchanged lines hidden (view full) --- 2684 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 2685 : _NXT_PORT_MSG_RPC_ERROR; 2686 msg.last = 1; 2687 msg.mmap = 0; 2688 msg.nf = 0; 2689 msg.mf = 0; 2690 msg.tracking = 0; 2691 |
2693 res = lib->callbacks.port_send(req->ctx, &req->response_port, 2694 &msg, sizeof(msg), NULL, 0); 2695 if (nxt_slow_path(res != sizeof(msg))) { 2696 nxt_unit_req_alert(req, "last message send failed: %s (%d)", 2697 strerror(errno), errno); 2698 } | 2692 (void) lib->callbacks.port_send(req->ctx, &req->response_port, 2693 &msg, sizeof(msg), NULL, 0); |
2699 2700 nxt_unit_request_info_release(req); 2701} 2702 2703 2704int 2705nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 2706 uint8_t last, const void *start, size_t size) --- 301 unchanged lines hidden (view full) --- 3008 msg.last = 0; 3009 msg.mmap = 0; 3010 msg.nf = 0; 3011 msg.mf = 0; 3012 msg.tracking = 0; 3013 3014 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); 3015 if (nxt_slow_path(res != sizeof(msg))) { | 2694 2695 nxt_unit_request_info_release(req); 2696} 2697 2698 2699int 2700nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 2701 uint8_t last, const void *start, size_t size) --- 301 unchanged lines hidden (view full) --- 3003 msg.last = 0; 3004 msg.mmap = 0; 3005 msg.nf = 0; 3006 msg.mf = 0; 3007 msg.tracking = 0; 3008 3009 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); 3010 if (nxt_slow_path(res != sizeof(msg))) { |
3016 nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)", 3017 (int) port_id->pid, strerror(errno), errno); 3018 | |
3019 return NXT_UNIT_ERROR; 3020 } 3021 3022 return NXT_UNIT_OK; 3023} 3024 3025 3026static int --- 7 unchanged lines hidden (view full) --- 3034 3035 while (1) { 3036 rbuf = nxt_unit_read_buf_get(ctx); 3037 if (nxt_slow_path(rbuf == NULL)) { 3038 return NXT_UNIT_ERROR; 3039 } 3040 3041 nxt_unit_read_buf(ctx, rbuf); | 3011 return NXT_UNIT_ERROR; 3012 } 3013 3014 return NXT_UNIT_OK; 3015} 3016 3017 3018static int --- 7 unchanged lines hidden (view full) --- 3026 3027 while (1) { 3028 rbuf = nxt_unit_read_buf_get(ctx); 3029 if (nxt_slow_path(rbuf == NULL)) { 3030 return NXT_UNIT_ERROR; 3031 } 3032 3033 nxt_unit_read_buf(ctx, rbuf); |
3034 |
|
3042 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 3043 nxt_unit_read_buf_release(ctx, rbuf); 3044 3045 return NXT_UNIT_ERROR; 3046 } 3047 3048 port_msg = (nxt_port_msg_t *) rbuf->buf; 3049 --- 239 unchanged lines hidden (view full) --- 3289 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3290 * in the same simple assignment as in the code above. 3291 */ 3292 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3293 3294 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), 3295 &cmsg, sizeof(cmsg)); 3296 if (nxt_slow_path(res != sizeof(msg))) { | 3035 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 3036 nxt_unit_read_buf_release(ctx, rbuf); 3037 3038 return NXT_UNIT_ERROR; 3039 } 3040 3041 port_msg = (nxt_port_msg_t *) rbuf->buf; 3042 --- 239 unchanged lines hidden (view full) --- 3282 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3283 * in the same simple assignment as in the code above. 3284 */ 3285 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3286 3287 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), 3288 &cmsg, sizeof(cmsg)); 3289 if (nxt_slow_path(res != sizeof(msg))) { |
3297 nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)", 3298 (int) port_id->pid, strerror(errno), errno); 3299 | |
3300 return NXT_UNIT_ERROR; 3301 } 3302 3303 return NXT_UNIT_OK; 3304} 3305 3306 3307static int --- 426 unchanged lines hidden (view full) --- 3734 msg.last = 0; 3735 msg.mmap = 0; 3736 msg.nf = 0; 3737 msg.mf = 0; 3738 msg.tracking = 0; 3739 3740 res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); 3741 if (nxt_slow_path(res != sizeof(msg))) { | 3290 return NXT_UNIT_ERROR; 3291 } 3292 3293 return NXT_UNIT_OK; 3294} 3295 3296 3297static int --- 426 unchanged lines hidden (view full) --- 3724 msg.last = 0; 3725 msg.mmap = 0; 3726 msg.nf = 0; 3727 msg.mf = 0; 3728 msg.tracking = 0; 3729 3730 res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); 3731 if (nxt_slow_path(res != sizeof(msg))) { |
3742 nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)", 3743 (int) port_id.pid, strerror(errno), errno); 3744 | |
3745 return NXT_UNIT_ERROR; 3746 } 3747 3748 return NXT_UNIT_OK; 3749} 3750 3751 3752static nxt_int_t --- 136 unchanged lines hidden (view full) --- 3889 int rc; 3890 nxt_unit_impl_t *lib; 3891 3892 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3893 rc = NXT_UNIT_OK; 3894 3895 while (nxt_fast_path(lib->online)) { 3896 rc = nxt_unit_run_once(ctx); | 3732 return NXT_UNIT_ERROR; 3733 } 3734 3735 return NXT_UNIT_OK; 3736} 3737 3738 3739static nxt_int_t --- 136 unchanged lines hidden (view full) --- 3876 int rc; 3877 nxt_unit_impl_t *lib; 3878 3879 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3880 rc = NXT_UNIT_OK; 3881 3882 while (nxt_fast_path(lib->online)) { 3883 rc = nxt_unit_run_once(ctx); |
3884 3885 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3886 break; 3887 } |
|
3897 } 3898 3899 return rc; 3900} 3901 3902 3903int 3904nxt_unit_run_once(nxt_unit_ctx_t *ctx) --- 642 unchanged lines hidden (view full) --- 4547 msg.msg_name = NULL; 4548 msg.msg_namelen = 0; 4549 msg.msg_iov = iov; 4550 msg.msg_iovlen = 1; 4551 msg.msg_flags = 0; 4552 msg.msg_control = (void *) oob; 4553 msg.msg_controllen = oob_size; 4554 | 3888 } 3889 3890 return rc; 3891} 3892 3893 3894int 3895nxt_unit_run_once(nxt_unit_ctx_t *ctx) --- 642 unchanged lines hidden (view full) --- 4538 msg.msg_name = NULL; 4539 msg.msg_namelen = 0; 4540 msg.msg_iov = iov; 4541 msg.msg_iovlen = 1; 4542 msg.msg_flags = 0; 4543 msg.msg_control = (void *) oob; 4544 msg.msg_controllen = oob_size; 4545 |
4546retry: 4547 |
|
4555 res = sendmsg(fd, &msg, 0); 4556 4557 if (nxt_slow_path(res == -1)) { | 4548 res = sendmsg(fd, &msg, 0); 4549 4550 if (nxt_slow_path(res == -1)) { |
4558 nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)", | 4551 if (errno == EINTR) { 4552 goto retry; 4553 } 4554 4555 /* 4556 * FIXME: This should be "alert" after router graceful shutdown 4557 * implementation. 4558 */ 4559 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", |
4559 fd, (int) buf_size, strerror(errno), errno); 4560 4561 } else { | 4560 fd, (int) buf_size, strerror(errno), errno); 4561 4562 } else { |
4562 nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size, | 4563 nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, |
4563 (int) res); 4564 } 4565 4566 return res; 4567} 4568 4569 4570static ssize_t --- 53 unchanged lines hidden (view full) --- 4624 msg.msg_name = NULL; 4625 msg.msg_namelen = 0; 4626 msg.msg_iov = iov; 4627 msg.msg_iovlen = 1; 4628 msg.msg_flags = 0; 4629 msg.msg_control = oob; 4630 msg.msg_controllen = oob_size; 4631 | 4564 (int) res); 4565 } 4566 4567 return res; 4568} 4569 4570 4571static ssize_t --- 53 unchanged lines hidden (view full) --- 4625 msg.msg_name = NULL; 4626 msg.msg_namelen = 0; 4627 msg.msg_iov = iov; 4628 msg.msg_iovlen = 1; 4629 msg.msg_flags = 0; 4630 msg.msg_control = oob; 4631 msg.msg_controllen = oob_size; 4632 |
4633retry: 4634 |
|
4632 res = recvmsg(fd, &msg, 0); 4633 4634 if (nxt_slow_path(res == -1)) { | 4635 res = recvmsg(fd, &msg, 0); 4636 4637 if (nxt_slow_path(res == -1)) { |
4635 nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)", 4636 fd, strerror(errno), errno); | 4638 if (errno == EINTR) { 4639 goto retry; 4640 } |
4637 | 4641 |
4642 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", 4643 fd, strerror(errno), errno); 4644 |
|
4638 } else { | 4645 } else { |
4639 nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res); | 4646 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res); |
4640 } 4641 4642 return res; 4643} 4644 4645 4646static nxt_int_t 4647nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) --- 341 unchanged lines hidden --- | 4647 } 4648 4649 return res; 4650} 4651 4652 4653static nxt_int_t 4654nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) --- 341 unchanged lines hidden --- |