Deleted
Added
nxt_unit.c (1557:a9f991601a8b) | nxt_unit.c (1558:026e4b909b61) |
---|---|
1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include <stdlib.h> 7 8#include "nxt_main.h" --- 197 unchanged lines hidden (view full) --- 206 nxt_port_id_t reply_port; 207 208 uint8_t last; /* 1 bit */ 209 uint8_t mmap; /* 1 bit */ 210 211 void *start; 212 uint32_t size; 213 | 1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include <stdlib.h> 7 8#include "nxt_main.h" --- 197 unchanged lines hidden (view full) --- 206 nxt_port_id_t reply_port; 207 208 uint8_t last; /* 1 bit */ 209 uint8_t mmap; /* 1 bit */ 210 211 void *start; 212 uint32_t size; 213 |
214 int fd; 215 int fd2; | 214 int fd[2]; |
216 217 nxt_unit_mmap_buf_t *incoming_buf; 218}; 219 220 221typedef enum { 222 NXT_UNIT_RS_START = 0, 223 NXT_UNIT_RS_RESPONSE_INIT, --- 671 unchanged lines hidden (view full) --- 895 struct cmsghdr *cm; 896 nxt_port_msg_t *port_msg; 897 nxt_unit_impl_t *lib; 898 nxt_unit_recv_msg_t recv_msg; 899 900 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 901 902 rc = NXT_UNIT_ERROR; | 215 216 nxt_unit_mmap_buf_t *incoming_buf; 217}; 218 219 220typedef enum { 221 NXT_UNIT_RS_START = 0, 222 NXT_UNIT_RS_RESPONSE_INIT, --- 671 unchanged lines hidden (view full) --- 894 struct cmsghdr *cm; 895 nxt_port_msg_t *port_msg; 896 nxt_unit_impl_t *lib; 897 nxt_unit_recv_msg_t recv_msg; 898 899 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 900 901 rc = NXT_UNIT_ERROR; |
903 recv_msg.fd = -1; 904 recv_msg.fd2 = -1; | 902 recv_msg.fd[0] = -1; 903 recv_msg.fd[1] = -1; |
905 port_msg = (nxt_port_msg_t *) rbuf->buf; 906 cm = (struct cmsghdr *) rbuf->oob; 907 908 if (cm->cmsg_level == SOL_SOCKET 909 && cm->cmsg_type == SCM_RIGHTS) 910 { 911 if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { | 904 port_msg = (nxt_port_msg_t *) rbuf->buf; 905 cm = (struct cmsghdr *) rbuf->oob; 906 907 if (cm->cmsg_level == SOL_SOCKET 908 && cm->cmsg_type == SCM_RIGHTS) 909 { 910 if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { |
912 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); | 911 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int)); |
913 } 914 915 if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { | 912 } 913 914 if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { |
916 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); | 915 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); |
917 } 918 } 919 920 recv_msg.incoming_buf = NULL; 921 922 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 923 if (nxt_slow_path(rbuf->size == 0)) { 924 nxt_unit_debug(ctx, "read port closed"); 925 926 nxt_unit_quit(ctx); 927 rc = NXT_UNIT_OK; 928 929 goto fail; 930 } 931 932 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); 933 goto fail; 934 } 935 | 916 } 917 } 918 919 recv_msg.incoming_buf = NULL; 920 921 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 922 if (nxt_slow_path(rbuf->size == 0)) { 923 nxt_unit_debug(ctx, "read port closed"); 924 925 nxt_unit_quit(ctx); 926 rc = NXT_UNIT_OK; 927 928 goto fail; 929 } 930 931 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); 932 goto fail; 933 } 934 |
936 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d", | 935 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", |
937 port_msg->stream, (int) port_msg->type, | 936 port_msg->stream, (int) port_msg->type, |
938 recv_msg.fd, recv_msg.fd2); | 937 recv_msg.fd[0], recv_msg.fd[1]); |
939 940 recv_msg.stream = port_msg->stream; 941 recv_msg.pid = port_msg->pid; 942 recv_msg.reply_port = port_msg->reply_port; 943 recv_msg.last = port_msg->last; 944 recv_msg.mmap = port_msg->mmap; 945 946 recv_msg.start = port_msg + 1; --- 12 unchanged lines hidden (view full) --- 959 goto fail; 960 } 961 962 if (port_msg->mmap) { 963 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); 964 965 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 966 if (rc == NXT_UNIT_AGAIN) { | 938 939 recv_msg.stream = port_msg->stream; 940 recv_msg.pid = port_msg->pid; 941 recv_msg.reply_port = port_msg->reply_port; 942 recv_msg.last = port_msg->last; 943 recv_msg.mmap = port_msg->mmap; 944 945 recv_msg.start = port_msg + 1; --- 12 unchanged lines hidden (view full) --- 958 goto fail; 959 } 960 961 if (port_msg->mmap) { 962 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); 963 964 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 965 if (rc == NXT_UNIT_AGAIN) { |
967 recv_msg.fd = -1; 968 recv_msg.fd2 = -1; | 966 recv_msg.fd[0] = -1; 967 recv_msg.fd[1] = -1; |
969 } 970 971 goto fail; 972 } 973 } 974 975 switch (port_msg->type) { 976 --- 5 unchanged lines hidden (view full) --- 982 break; 983 984 case _NXT_PORT_MSG_NEW_PORT: 985 rc = nxt_unit_process_new_port(ctx, &recv_msg); 986 break; 987 988 case _NXT_PORT_MSG_CHANGE_FILE: 989 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", | 968 } 969 970 goto fail; 971 } 972 } 973 974 switch (port_msg->type) { 975 --- 5 unchanged lines hidden (view full) --- 981 break; 982 983 case _NXT_PORT_MSG_NEW_PORT: 984 rc = nxt_unit_process_new_port(ctx, &recv_msg); 985 break; 986 987 case _NXT_PORT_MSG_CHANGE_FILE: 988 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", |
990 port_msg->stream, recv_msg.fd); | 989 port_msg->stream, recv_msg.fd[0]); |
991 | 990 |
992 if (dup2(recv_msg.fd, lib->log_fd) == -1) { | 991 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { |
993 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", | 992 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", |
994 port_msg->stream, recv_msg.fd, lib->log_fd, | 993 port_msg->stream, recv_msg.fd[0], lib->log_fd, |
995 strerror(errno), errno); 996 997 goto fail; 998 } 999 1000 rc = NXT_UNIT_OK; 1001 break; 1002 1003 case _NXT_PORT_MSG_MMAP: | 994 strerror(errno), errno); 995 996 goto fail; 997 } 998 999 rc = NXT_UNIT_OK; 1000 break; 1001 1002 case _NXT_PORT_MSG_MMAP: |
1004 if (nxt_slow_path(recv_msg.fd < 0)) { | 1003 if (nxt_slow_path(recv_msg.fd[0] < 0)) { |
1005 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", | 1004 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", |
1006 port_msg->stream, recv_msg.fd); | 1005 port_msg->stream, recv_msg.fd[0]); |
1007 1008 goto fail; 1009 } 1010 | 1006 1007 goto fail; 1008 } 1009 |
1011 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); | 1010 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); |
1012 break; 1013 1014 case _NXT_PORT_MSG_REQ_HEADERS: 1015 rc = nxt_unit_process_req_headers(ctx, &recv_msg); 1016 break; 1017 1018 case _NXT_PORT_MSG_REQ_BODY: 1019 rc = nxt_unit_process_req_body(ctx, &recv_msg); --- 30 unchanged lines hidden (view full) --- 1050 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", 1051 port_msg->stream, (int) port_msg->type); 1052 1053 goto fail; 1054 } 1055 1056fail: 1057 | 1011 break; 1012 1013 case _NXT_PORT_MSG_REQ_HEADERS: 1014 rc = nxt_unit_process_req_headers(ctx, &recv_msg); 1015 break; 1016 1017 case _NXT_PORT_MSG_REQ_BODY: 1018 rc = nxt_unit_process_req_body(ctx, &recv_msg); --- 30 unchanged lines hidden (view full) --- 1049 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", 1050 port_msg->stream, (int) port_msg->type); 1051 1052 goto fail; 1053 } 1054 1055fail: 1056 |
1058 if (recv_msg.fd != -1) { 1059 nxt_unit_close(recv_msg.fd); | 1057 if (recv_msg.fd[0] != -1) { 1058 nxt_unit_close(recv_msg.fd[0]); |
1060 } 1061 | 1059 } 1060 |
1062 if (recv_msg.fd2 != -1) { 1063 nxt_unit_close(recv_msg.fd2); | 1061 if (recv_msg.fd[1] != -1) { 1062 nxt_unit_close(recv_msg.fd[1]); |
1064 } 1065 1066 while (recv_msg.incoming_buf != NULL) { 1067 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 1068 } 1069 1070 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { 1071#if (NXT_DEBUG) --- 17 unchanged lines hidden (view full) --- 1089 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 1090 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 1091 "invalid message size (%d)", 1092 recv_msg->stream, (int) recv_msg->size); 1093 1094 return NXT_UNIT_ERROR; 1095 } 1096 | 1063 } 1064 1065 while (recv_msg.incoming_buf != NULL) { 1066 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 1067 } 1068 1069 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { 1070#if (NXT_DEBUG) --- 17 unchanged lines hidden (view full) --- 1088 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 1089 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 1090 "invalid message size (%d)", 1091 recv_msg->stream, (int) recv_msg->size); 1092 1093 return NXT_UNIT_ERROR; 1094 } 1095 |
1097 if (nxt_slow_path(recv_msg->fd < 0)) { | 1096 if (nxt_slow_path(recv_msg->fd[0] < 0)) { |
1098 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", | 1097 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", |
1099 recv_msg->stream, recv_msg->fd); | 1098 recv_msg->stream, recv_msg->fd[0]); |
1100 1101 return NXT_UNIT_ERROR; 1102 } 1103 1104 new_port_msg = recv_msg->start; 1105 | 1099 1100 return NXT_UNIT_ERROR; 1101 } 1102 1103 new_port_msg = recv_msg->start; 1104 |
1106 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d", | 1105 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d", |
1107 recv_msg->stream, (int) new_port_msg->pid, | 1106 recv_msg->stream, (int) new_port_msg->pid, |
1108 (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2); | 1107 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); |
1109 1110 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1111 1112 if (new_port_msg->id == (nxt_port_id_t) -1) { 1113 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); 1114 | 1108 1109 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1110 1111 if (new_port_msg->id == (nxt_port_id_t) -1) { 1112 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); 1113 |
1115 new_port.in_fd = recv_msg->fd; | 1114 new_port.in_fd = recv_msg->fd[0]; |
1116 new_port.out_fd = -1; 1117 1118 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, | 1115 new_port.out_fd = -1; 1116 1117 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, |
1119 MAP_SHARED, recv_msg->fd2, 0); | 1118 MAP_SHARED, recv_msg->fd[1], 0); |
1120 1121 } else { | 1119 1120 } else { |
1122 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd) != NXT_UNIT_OK)) { | 1121 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) 1122 != NXT_UNIT_OK)) 1123 { |
1123 return NXT_UNIT_ERROR; 1124 } 1125 1126 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 1127 new_port_msg->id); 1128 1129 new_port.in_fd = -1; | 1124 return NXT_UNIT_ERROR; 1125 } 1126 1127 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 1128 new_port_msg->id); 1129 1130 new_port.in_fd = -1; |
1130 new_port.out_fd = recv_msg->fd; | 1131 new_port.out_fd = recv_msg->fd[0]; |
1131 1132 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, | 1132 1133 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, |
1133 MAP_SHARED, recv_msg->fd2, 0); | 1134 MAP_SHARED, recv_msg->fd[1], 0); |
1134 } 1135 1136 if (nxt_slow_path(mem == MAP_FAILED)) { | 1135 } 1136 1137 if (nxt_slow_path(mem == MAP_FAILED)) { |
1137 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2, | 1138 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], |
1138 strerror(errno), errno); 1139 1140 return NXT_UNIT_ERROR; 1141 } 1142 1143 new_port.data = NULL; 1144 | 1139 strerror(errno), errno); 1140 1141 return NXT_UNIT_ERROR; 1142 } 1143 1144 new_port.data = NULL; 1145 |
1145 recv_msg->fd = -1; | 1146 recv_msg->fd[0] = -1; |
1146 1147 port = nxt_unit_add_port(ctx, &new_port, mem); 1148 if (nxt_slow_path(port == NULL)) { 1149 return NXT_UNIT_ERROR; 1150 } 1151 1152 if (new_port_msg->id == (nxt_port_id_t) -1) { 1153 lib->shared_port = port; --- 65 unchanged lines hidden (view full) --- 1219 b->req = req; 1220 } 1221 1222 /* "Move" incoming buffer list to req_impl. */ 1223 req_impl->incoming_buf = recv_msg->incoming_buf; 1224 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 1225 recv_msg->incoming_buf = NULL; 1226 | 1147 1148 port = nxt_unit_add_port(ctx, &new_port, mem); 1149 if (nxt_slow_path(port == NULL)) { 1150 return NXT_UNIT_ERROR; 1151 } 1152 1153 if (new_port_msg->id == (nxt_port_id_t) -1) { 1154 lib->shared_port = port; --- 65 unchanged lines hidden (view full) --- 1220 b->req = req; 1221 } 1222 1223 /* "Move" incoming buffer list to req_impl. */ 1224 req_impl->incoming_buf = recv_msg->incoming_buf; 1225 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 1226 recv_msg->incoming_buf = NULL; 1227 |
1227 req->content_fd = recv_msg->fd; 1228 recv_msg->fd = -1; | 1228 req->content_fd = recv_msg->fd[0]; 1229 recv_msg->fd[0] = -1; |
1229 1230 req->response_max_fields = 0; 1231 req_impl->state = NXT_UNIT_RS_START; 1232 req_impl->websocket = 0; 1233 req_impl->in_hash = 0; 1234 1235 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 1236 (int) r->method_length, --- 70 unchanged lines hidden (view full) --- 1307 if (recv_msg->incoming_buf != NULL) { 1308 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); 1309 1310 /* "Move" incoming buffer list to req_impl. */ 1311 nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf); 1312 recv_msg->incoming_buf = NULL; 1313 } 1314 | 1230 1231 req->response_max_fields = 0; 1232 req_impl->state = NXT_UNIT_RS_START; 1233 req_impl->websocket = 0; 1234 req_impl->in_hash = 0; 1235 1236 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 1237 (int) r->method_length, --- 70 unchanged lines hidden (view full) --- 1308 if (recv_msg->incoming_buf != NULL) { 1309 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); 1310 1311 /* "Move" incoming buffer list to req_impl. */ 1312 nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf); 1313 recv_msg->incoming_buf = NULL; 1314 } 1315 |
1315 req->content_fd = recv_msg->fd; 1316 recv_msg->fd = -1; | 1316 req->content_fd = recv_msg->fd[0]; 1317 recv_msg->fd[0] = -1; |
1317 1318 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1319 1320 if (lib->callbacks.data_handler != NULL) { 1321 lib->callbacks.data_handler(req); 1322 1323 return NXT_UNIT_OK; 1324 } --- 5004 unchanged lines hidden --- | 1318 1319 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1320 1321 if (lib->callbacks.data_handler != NULL) { 1322 lib->callbacks.data_handler(req); 1323 1324 return NXT_UNIT_OK; 1325 } --- 5004 unchanged lines hidden --- |