nxt_unit.c (1557:a9f991601a8b) nxt_unit.c (1558:026e4b909b61)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"

--- 197 unchanged lines hidden (view full) ---

206 nxt_port_id_t reply_port;
207
208 uint8_t last; /* 1 bit */
209 uint8_t mmap; /* 1 bit */
210
211 void *start;
212 uint32_t size;
213
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"

--- 197 unchanged lines hidden (view full) ---

206 nxt_port_id_t reply_port;
207
208 uint8_t last; /* 1 bit */
209 uint8_t mmap; /* 1 bit */
210
211 void *start;
212 uint32_t size;
213
214 int fd;
215 int fd2;
214 int fd[2];
216
217 nxt_unit_mmap_buf_t *incoming_buf;
218};
219
220
221typedef enum {
222 NXT_UNIT_RS_START = 0,
223 NXT_UNIT_RS_RESPONSE_INIT,

--- 671 unchanged lines hidden (view full) ---

895 struct cmsghdr *cm;
896 nxt_port_msg_t *port_msg;
897 nxt_unit_impl_t *lib;
898 nxt_unit_recv_msg_t recv_msg;
899
900 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
901
902 rc = NXT_UNIT_ERROR;
215
216 nxt_unit_mmap_buf_t *incoming_buf;
217};
218
219
220typedef enum {
221 NXT_UNIT_RS_START = 0,
222 NXT_UNIT_RS_RESPONSE_INIT,

--- 671 unchanged lines hidden (view full) ---

894 struct cmsghdr *cm;
895 nxt_port_msg_t *port_msg;
896 nxt_unit_impl_t *lib;
897 nxt_unit_recv_msg_t recv_msg;
898
899 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
900
901 rc = NXT_UNIT_ERROR;
903 recv_msg.fd = -1;
904 recv_msg.fd2 = -1;
902 recv_msg.fd[0] = -1;
903 recv_msg.fd[1] = -1;
905 port_msg = (nxt_port_msg_t *) rbuf->buf;
906 cm = (struct cmsghdr *) rbuf->oob;
907
908 if (cm->cmsg_level == SOL_SOCKET
909 && cm->cmsg_type == SCM_RIGHTS)
910 {
911 if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
904 port_msg = (nxt_port_msg_t *) rbuf->buf;
905 cm = (struct cmsghdr *) rbuf->oob;
906
907 if (cm->cmsg_level == SOL_SOCKET
908 && cm->cmsg_type == SCM_RIGHTS)
909 {
910 if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
912 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
911 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int));
913 }
914
915 if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
912 }
913
914 if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
916 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
915 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
917 }
918 }
919
920 recv_msg.incoming_buf = NULL;
921
922 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
923 if (nxt_slow_path(rbuf->size == 0)) {
924 nxt_unit_debug(ctx, "read port closed");
925
926 nxt_unit_quit(ctx);
927 rc = NXT_UNIT_OK;
928
929 goto fail;
930 }
931
932 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
933 goto fail;
934 }
935
916 }
917 }
918
919 recv_msg.incoming_buf = NULL;
920
921 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
922 if (nxt_slow_path(rbuf->size == 0)) {
923 nxt_unit_debug(ctx, "read port closed");
924
925 nxt_unit_quit(ctx);
926 rc = NXT_UNIT_OK;
927
928 goto fail;
929 }
930
931 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
932 goto fail;
933 }
934
936 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d",
935 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
937 port_msg->stream, (int) port_msg->type,
936 port_msg->stream, (int) port_msg->type,
938 recv_msg.fd, recv_msg.fd2);
937 recv_msg.fd[0], recv_msg.fd[1]);
939
940 recv_msg.stream = port_msg->stream;
941 recv_msg.pid = port_msg->pid;
942 recv_msg.reply_port = port_msg->reply_port;
943 recv_msg.last = port_msg->last;
944 recv_msg.mmap = port_msg->mmap;
945
946 recv_msg.start = port_msg + 1;

--- 12 unchanged lines hidden (view full) ---

959 goto fail;
960 }
961
962 if (port_msg->mmap) {
963 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
964
965 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
966 if (rc == NXT_UNIT_AGAIN) {
938
939 recv_msg.stream = port_msg->stream;
940 recv_msg.pid = port_msg->pid;
941 recv_msg.reply_port = port_msg->reply_port;
942 recv_msg.last = port_msg->last;
943 recv_msg.mmap = port_msg->mmap;
944
945 recv_msg.start = port_msg + 1;

--- 12 unchanged lines hidden (view full) ---

958 goto fail;
959 }
960
961 if (port_msg->mmap) {
962 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
963
964 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
965 if (rc == NXT_UNIT_AGAIN) {
967 recv_msg.fd = -1;
968 recv_msg.fd2 = -1;
966 recv_msg.fd[0] = -1;
967 recv_msg.fd[1] = -1;
969 }
970
971 goto fail;
972 }
973 }
974
975 switch (port_msg->type) {
976

--- 5 unchanged lines hidden (view full) ---

982 break;
983
984 case _NXT_PORT_MSG_NEW_PORT:
985 rc = nxt_unit_process_new_port(ctx, &recv_msg);
986 break;
987
988 case _NXT_PORT_MSG_CHANGE_FILE:
989 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
968 }
969
970 goto fail;
971 }
972 }
973
974 switch (port_msg->type) {
975

--- 5 unchanged lines hidden (view full) ---

981 break;
982
983 case _NXT_PORT_MSG_NEW_PORT:
984 rc = nxt_unit_process_new_port(ctx, &recv_msg);
985 break;
986
987 case _NXT_PORT_MSG_CHANGE_FILE:
988 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
990 port_msg->stream, recv_msg.fd);
989 port_msg->stream, recv_msg.fd[0]);
991
990
992 if (dup2(recv_msg.fd, lib->log_fd) == -1) {
991 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
993 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
992 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
994 port_msg->stream, recv_msg.fd, lib->log_fd,
993 port_msg->stream, recv_msg.fd[0], lib->log_fd,
995 strerror(errno), errno);
996
997 goto fail;
998 }
999
1000 rc = NXT_UNIT_OK;
1001 break;
1002
1003 case _NXT_PORT_MSG_MMAP:
994 strerror(errno), errno);
995
996 goto fail;
997 }
998
999 rc = NXT_UNIT_OK;
1000 break;
1001
1002 case _NXT_PORT_MSG_MMAP:
1004 if (nxt_slow_path(recv_msg.fd < 0)) {
1003 if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1005 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1004 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1006 port_msg->stream, recv_msg.fd);
1005 port_msg->stream, recv_msg.fd[0]);
1007
1008 goto fail;
1009 }
1010
1006
1007 goto fail;
1008 }
1009
1011 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
1010 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1012 break;
1013
1014 case _NXT_PORT_MSG_REQ_HEADERS:
1015 rc = nxt_unit_process_req_headers(ctx, &recv_msg);
1016 break;
1017
1018 case _NXT_PORT_MSG_REQ_BODY:
1019 rc = nxt_unit_process_req_body(ctx, &recv_msg);

--- 30 unchanged lines hidden (view full) ---

1050 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
1051 port_msg->stream, (int) port_msg->type);
1052
1053 goto fail;
1054 }
1055
1056fail:
1057
1011 break;
1012
1013 case _NXT_PORT_MSG_REQ_HEADERS:
1014 rc = nxt_unit_process_req_headers(ctx, &recv_msg);
1015 break;
1016
1017 case _NXT_PORT_MSG_REQ_BODY:
1018 rc = nxt_unit_process_req_body(ctx, &recv_msg);

--- 30 unchanged lines hidden (view full) ---

1049 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
1050 port_msg->stream, (int) port_msg->type);
1051
1052 goto fail;
1053 }
1054
1055fail:
1056
1058 if (recv_msg.fd != -1) {
1059 nxt_unit_close(recv_msg.fd);
1057 if (recv_msg.fd[0] != -1) {
1058 nxt_unit_close(recv_msg.fd[0]);
1060 }
1061
1059 }
1060
1062 if (recv_msg.fd2 != -1) {
1063 nxt_unit_close(recv_msg.fd2);
1061 if (recv_msg.fd[1] != -1) {
1062 nxt_unit_close(recv_msg.fd[1]);
1064 }
1065
1066 while (recv_msg.incoming_buf != NULL) {
1067 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1068 }
1069
1070 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1071#if (NXT_DEBUG)

--- 17 unchanged lines hidden (view full) ---

1089 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1090 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1091 "invalid message size (%d)",
1092 recv_msg->stream, (int) recv_msg->size);
1093
1094 return NXT_UNIT_ERROR;
1095 }
1096
1063 }
1064
1065 while (recv_msg.incoming_buf != NULL) {
1066 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1067 }
1068
1069 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1070#if (NXT_DEBUG)

--- 17 unchanged lines hidden (view full) ---

1088 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1089 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1090 "invalid message size (%d)",
1091 recv_msg->stream, (int) recv_msg->size);
1092
1093 return NXT_UNIT_ERROR;
1094 }
1095
1097 if (nxt_slow_path(recv_msg->fd < 0)) {
1096 if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1098 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1097 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1099 recv_msg->stream, recv_msg->fd);
1098 recv_msg->stream, recv_msg->fd[0]);
1100
1101 return NXT_UNIT_ERROR;
1102 }
1103
1104 new_port_msg = recv_msg->start;
1105
1099
1100 return NXT_UNIT_ERROR;
1101 }
1102
1103 new_port_msg = recv_msg->start;
1104
1106 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d",
1105 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1107 recv_msg->stream, (int) new_port_msg->pid,
1106 recv_msg->stream, (int) new_port_msg->pid,
1108 (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2);
1107 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1109
1110 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1111
1112 if (new_port_msg->id == (nxt_port_id_t) -1) {
1113 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1114
1108
1109 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1110
1111 if (new_port_msg->id == (nxt_port_id_t) -1) {
1112 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1113
1115 new_port.in_fd = recv_msg->fd;
1114 new_port.in_fd = recv_msg->fd[0];
1116 new_port.out_fd = -1;
1117
1118 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1115 new_port.out_fd = -1;
1116
1117 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1119 MAP_SHARED, recv_msg->fd2, 0);
1118 MAP_SHARED, recv_msg->fd[1], 0);
1120
1121 } else {
1119
1120 } else {
1122 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd) != NXT_UNIT_OK)) {
1121 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
1122 != NXT_UNIT_OK))
1123 {
1123 return NXT_UNIT_ERROR;
1124 }
1125
1126 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1127 new_port_msg->id);
1128
1129 new_port.in_fd = -1;
1124 return NXT_UNIT_ERROR;
1125 }
1126
1127 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1128 new_port_msg->id);
1129
1130 new_port.in_fd = -1;
1130 new_port.out_fd = recv_msg->fd;
1131 new_port.out_fd = recv_msg->fd[0];
1131
1132 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1132
1133 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1133 MAP_SHARED, recv_msg->fd2, 0);
1134 MAP_SHARED, recv_msg->fd[1], 0);
1134 }
1135
1136 if (nxt_slow_path(mem == MAP_FAILED)) {
1135 }
1136
1137 if (nxt_slow_path(mem == MAP_FAILED)) {
1137 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2,
1138 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1138 strerror(errno), errno);
1139
1140 return NXT_UNIT_ERROR;
1141 }
1142
1143 new_port.data = NULL;
1144
1139 strerror(errno), errno);
1140
1141 return NXT_UNIT_ERROR;
1142 }
1143
1144 new_port.data = NULL;
1145
1145 recv_msg->fd = -1;
1146 recv_msg->fd[0] = -1;
1146
1147 port = nxt_unit_add_port(ctx, &new_port, mem);
1148 if (nxt_slow_path(port == NULL)) {
1149 return NXT_UNIT_ERROR;
1150 }
1151
1152 if (new_port_msg->id == (nxt_port_id_t) -1) {
1153 lib->shared_port = port;

--- 65 unchanged lines hidden (view full) ---

1219 b->req = req;
1220 }
1221
1222 /* "Move" incoming buffer list to req_impl. */
1223 req_impl->incoming_buf = recv_msg->incoming_buf;
1224 req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1225 recv_msg->incoming_buf = NULL;
1226
1147
1148 port = nxt_unit_add_port(ctx, &new_port, mem);
1149 if (nxt_slow_path(port == NULL)) {
1150 return NXT_UNIT_ERROR;
1151 }
1152
1153 if (new_port_msg->id == (nxt_port_id_t) -1) {
1154 lib->shared_port = port;

--- 65 unchanged lines hidden (view full) ---

1220 b->req = req;
1221 }
1222
1223 /* "Move" incoming buffer list to req_impl. */
1224 req_impl->incoming_buf = recv_msg->incoming_buf;
1225 req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1226 recv_msg->incoming_buf = NULL;
1227
1227 req->content_fd = recv_msg->fd;
1228 recv_msg->fd = -1;
1228 req->content_fd = recv_msg->fd[0];
1229 recv_msg->fd[0] = -1;
1229
1230 req->response_max_fields = 0;
1231 req_impl->state = NXT_UNIT_RS_START;
1232 req_impl->websocket = 0;
1233 req_impl->in_hash = 0;
1234
1235 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1236 (int) r->method_length,

--- 70 unchanged lines hidden (view full) ---

1307 if (recv_msg->incoming_buf != NULL) {
1308 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1309
1310 /* "Move" incoming buffer list to req_impl. */
1311 nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf);
1312 recv_msg->incoming_buf = NULL;
1313 }
1314
1230
1231 req->response_max_fields = 0;
1232 req_impl->state = NXT_UNIT_RS_START;
1233 req_impl->websocket = 0;
1234 req_impl->in_hash = 0;
1235
1236 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1237 (int) r->method_length,

--- 70 unchanged lines hidden (view full) ---

1308 if (recv_msg->incoming_buf != NULL) {
1309 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1310
1311 /* "Move" incoming buffer list to req_impl. */
1312 nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf);
1313 recv_msg->incoming_buf = NULL;
1314 }
1315
1315 req->content_fd = recv_msg->fd;
1316 recv_msg->fd = -1;
1316 req->content_fd = recv_msg->fd[0];
1317 recv_msg->fd[0] = -1;
1317
1318 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1319
1320 if (lib->callbacks.data_handler != NULL) {
1321 lib->callbacks.data_handler(req);
1322
1323 return NXT_UNIT_OK;
1324 }

--- 5004 unchanged lines hidden ---
1318
1319 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1320
1321 if (lib->callbacks.data_handler != NULL) {
1322 lib->callbacks.data_handler(req);
1323
1324 return NXT_UNIT_OK;
1325 }

--- 5004 unchanged lines hidden ---