Deleted Added
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"
9#include "nxt_port_memory_int.h"
10#include "nxt_port_queue.h"
11#include "nxt_app_queue.h"
12
13#include "nxt_unit.h"
14#include "nxt_unit_request.h"
15#include "nxt_unit_response.h"
16#include "nxt_unit_websocket.h"
17
18#include "nxt_websocket.h"
19

--- 27 unchanged lines hidden (view full) ---

47nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
48 nxt_unit_mmap_buf_t *mmap_buf);
49nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
50 nxt_unit_mmap_buf_t *mmap_buf);
51nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
52static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
53 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
54 int *log_fd, uint32_t *stream, uint32_t *shm_limit);
55static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
56 int queue_fd);
57static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
58static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
59 nxt_unit_recv_msg_t *recv_msg);
60static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
61 nxt_unit_recv_msg_t *recv_msg);
62static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
63 nxt_unit_recv_msg_t *recv_msg);
64static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
65 nxt_unit_port_id_t *port_id);
66static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
67static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
68 nxt_unit_recv_msg_t *recv_msg);
69static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
70static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
71 nxt_unit_ctx_t *ctx);

--- 20 unchanged lines hidden (view full) ---

92 size_t size);
93static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
94 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
95static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
96static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
97static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
98static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
99 nxt_unit_port_t *port, int n);
100static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
101static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
102 int fd);
103static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
104 nxt_unit_port_t *port, uint32_t size,
105 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
106static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
107
108static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
109nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
110nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
111static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
112static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
113 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
114 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
115static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
116 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
117static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
118static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
119 nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
120static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
121
122static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
123 pid_t pid);
124static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
125 pid_t pid, int remove);
126static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
127static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
128static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
129static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
130static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
131nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
132nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
133nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
134nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
135static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
136 nxt_unit_port_t *port);
137static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
138static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
139
140static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
141 nxt_unit_port_t *port, int queue_fd);
142
143nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
144nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
145static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
146 nxt_unit_port_t *port, void *queue);
147static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
148 nxt_unit_port_id_t *port_id);
149static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
150 nxt_unit_port_id_t *port_id);
151static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
152static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
153 nxt_unit_process_t *process);
154static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
155static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
156static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
157 nxt_unit_port_t *port, const void *buf, size_t buf_size,
158 const void *oob, size_t oob_size);
159static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
160 const void *buf, size_t buf_size, const void *oob, size_t oob_size);
161static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
162 nxt_unit_read_buf_t *rbuf);
163nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
164 nxt_unit_read_buf_t *src);
165static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
166 nxt_unit_read_buf_t *rbuf);
167static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
168 nxt_unit_read_buf_t *rbuf);
169static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
170 nxt_unit_read_buf_t *rbuf);
171static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
172 nxt_unit_read_buf_t *rbuf);
173
174static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
175 nxt_unit_port_t *port);
176static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
177 nxt_unit_port_id_t *port_id, int remove);
178
179static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
180 nxt_unit_request_info_t *req);
181static nxt_unit_request_info_t *nxt_unit_request_hash_find(
182 nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
183
184static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
185
186
187struct nxt_unit_mmap_buf_s {
188 nxt_unit_buf_t buf;
189
190 nxt_unit_mmap_buf_t *next;

--- 39 unchanged lines hidden (view full) ---

230
231 uint32_t stream;
232
233 nxt_unit_mmap_buf_t *outgoing_buf;
234 nxt_unit_mmap_buf_t *incoming_buf;
235
236 nxt_unit_req_state_t state;
237 uint8_t websocket;
238 uint8_t in_hash;
239
240 /* for nxt_unit_ctx_impl_t.free_req or active_req */
241 nxt_queue_link_t link;
242 /* for nxt_unit_port_impl_t.awaiting_req */
243 nxt_queue_link_t port_wait_link;
244
245 char extra_data[];
246};

--- 116 unchanged lines hidden (view full) ---

363 /* for nxt_unit_process_t.ports */
364 nxt_queue_link_t link;
365 nxt_unit_process_t *process;
366
367 /* of nxt_unit_request_info_impl_t */
368 nxt_queue_t awaiting_req;
369
370 int ready;
371
372 void *queue;
373
374 int from_socket;
375 nxt_unit_read_buf_t *socket_rbuf;
376};
377
378
379struct nxt_unit_process_s {
380 pid_t pid;
381
382 nxt_queue_t ports; /* of nxt_unit_port_impl_t */
383

--- 10 unchanged lines hidden (view full) ---

394 int32_t pid;
395 uint32_t id;
396} nxt_unit_port_hash_id_t;
397
398
399nxt_unit_ctx_t *
400nxt_unit_init(nxt_unit_init_t *init)
401{
402 int rc, queue_fd;
403 void *mem;
404 uint32_t ready_stream, shm_limit;
405 nxt_unit_ctx_t *ctx;
406 nxt_unit_impl_t *lib;
407 nxt_unit_port_t ready_port, router_port, read_port;
408
409 lib = nxt_unit_create(init);
410 if (nxt_slow_path(lib == NULL)) {
411 return NULL;
412 }
413
414 queue_fd = -1;
415
416 if (init->ready_port.id.pid != 0
417 && init->ready_stream != 0
418 && init->read_port.id.pid != 0)
419 {
420 ready_port = init->ready_port;
421 ready_stream = init->ready_stream;
422 router_port = init->router_port;
423 read_port = init->read_port;

--- 20 unchanged lines hidden (view full) ---

444 if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
445 lib->shm_mmap_limit = 1;
446 }
447
448 lib->pid = read_port.id.pid;
449
450 ctx = &lib->main_ctx.ctx;
451
452 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
453 if (nxt_slow_path(lib->router_port == NULL)) {
454 nxt_unit_alert(NULL, "failed to add router_port");
455
456 goto fail;
457 }
458
459 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
460 if (nxt_slow_path(queue_fd == -1)) {
461 goto fail;
462 }
463
464 mem = mmap(NULL, sizeof(nxt_port_queue_t),
465 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
466 if (nxt_slow_path(mem == MAP_FAILED)) {
467 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
468 strerror(errno), errno);
469
470 goto fail;
471 }
472
473 nxt_port_queue_init(mem);
474
475 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
476 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
477 nxt_unit_alert(NULL, "failed to add read_port");
478
479 munmap(mem, sizeof(nxt_port_queue_t));
480
481 goto fail;
482 }
483
484 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
485 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
486 nxt_unit_alert(NULL, "failed to send READY message");
487
488 munmap(mem, sizeof(nxt_port_queue_t));
489
490 goto fail;
491 }
492
493 close(ready_port.out_fd);
494 close(queue_fd);
495
496 return ctx;
497
498fail:
499
500 if (queue_fd != -1) {
501 close(queue_fd);
502 }
503
504 nxt_unit_ctx_release(&lib->main_ctx.ctx);
505
506 return NULL;
507}
508
509
510static nxt_unit_impl_t *
511nxt_unit_create(nxt_unit_init_t *init)

--- 32 unchanged lines hidden (view full) ---

544 nxt_queue_init(&lib->contexts);
545
546 lib->use_count = 0;
547 lib->router_port = NULL;
548 lib->shared_port = NULL;
549
550 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
551 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
552 pthread_mutex_destroy(&lib->mutex);
553 goto fail;
554 }
555
556 cb = &lib->callbacks;
557
558 if (cb->request_handler == NULL) {
559 nxt_unit_alert(NULL, "request_handler is NULL");
560
561 pthread_mutex_destroy(&lib->mutex);
562 goto fail;
563 }
564
565 nxt_unit_mmaps_init(&lib->incoming);
566 nxt_unit_mmaps_init(&lib->outgoing);
567
568 return lib;
569

--- 244 unchanged lines hidden (view full) ---

814
815 *stream = ready_stream;
816
817 return NXT_UNIT_OK;
818}
819
820
821static int
822nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
823{
824 ssize_t res;
825 nxt_port_msg_t msg;
826 nxt_unit_impl_t *lib;
827
828 union {
829 struct cmsghdr cm;
830 char space[CMSG_SPACE(sizeof(int))];
831 } cmsg;
832
833 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
834
835 msg.stream = stream;
836 msg.pid = lib->pid;
837 msg.reply_port = 0;
838 msg.type = _NXT_PORT_MSG_PROCESS_READY;
839 msg.last = 1;
840 msg.mmap = 0;
841 msg.nf = 0;
842 msg.mf = 0;
843 msg.tracking = 0;
844
845 memset(&cmsg, 0, sizeof(cmsg));
846
847 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
848 cmsg.cm.cmsg_level = SOL_SOCKET;
849 cmsg.cm.cmsg_type = SCM_RIGHTS;
850
851 /*
852 * memcpy() is used instead of simple
853 * *(int *) CMSG_DATA(&cmsg.cm) = fd;
854 * because GCC 4.4 with -O2/3/s optimization may issue a warning:
855 * dereferencing type-punned pointer will break strict-aliasing rules
856 *
857 * Fortunately, GCC with -O1 compiles this nxt_memcpy()
858 * in the same simple assignment as in the code above.
859 */
860 memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
861
862 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
863 &cmsg, sizeof(cmsg));
864 if (res != sizeof(msg)) {
865 return NXT_UNIT_ERROR;
866 }
867
868 return NXT_UNIT_OK;
869}
870
871

--- 38 unchanged lines hidden (view full) ---

910
911 goto fail;
912 }
913
914 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
915 goto fail;
916 }
917
918 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d",
919 port_msg->stream, (int) port_msg->type,
920 recv_msg.fd, recv_msg.fd2);
921
922 recv_msg.stream = port_msg->stream;
923 recv_msg.pid = port_msg->pid;
924 recv_msg.reply_port = port_msg->reply_port;
925 recv_msg.last = port_msg->last;
926 recv_msg.mmap = port_msg->mmap;
927
928 recv_msg.start = port_msg + 1;
929 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
930
931 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
932 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
933 port_msg->stream, (int) port_msg->type);
934 goto fail;
935 }
936
937 /* Fragmentation is unsupported. */
938 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
939 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
940 port_msg->stream, (int) port_msg->type);
941 goto fail;
942 }
943
944 if (port_msg->mmap) {

--- 47 unchanged lines hidden (view full) ---

992
993 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
994 break;
995
996 case _NXT_PORT_MSG_REQ_HEADERS:
997 rc = nxt_unit_process_req_headers(ctx, &recv_msg);
998 break;
999
1000 case _NXT_PORT_MSG_REQ_BODY:
1001 rc = nxt_unit_process_req_body(ctx, &recv_msg);
1002 break;
1003
1004 case _NXT_PORT_MSG_WEBSOCKET:
1005 rc = nxt_unit_process_websocket(ctx, &recv_msg);
1006 break;
1007
1008 case _NXT_PORT_MSG_REMOVE_PID:
1009 if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1010 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
1011 "(%d != %d)", port_msg->stream, (int) recv_msg.size,

--- 47 unchanged lines hidden (view full) ---

1059 return rc;
1060}
1061
1062
1063static int
1064nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1065{
1066 int nb;
1067 void *mem;
1068 nxt_unit_impl_t *lib;
1069 nxt_unit_port_t new_port, *port;
1070 nxt_port_msg_new_port_t *new_port_msg;
1071
1072 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1073 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1074 "invalid message size (%d)",
1075 recv_msg->stream, (int) recv_msg->size);

--- 5 unchanged lines hidden (view full) ---

1081 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1082 recv_msg->stream, recv_msg->fd);
1083
1084 return NXT_UNIT_ERROR;
1085 }
1086
1087 new_port_msg = recv_msg->start;
1088
1089 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d",
1090 recv_msg->stream, (int) new_port_msg->pid,
1091 (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2);
1092
1093 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1094
1095 if (new_port_msg->id == (nxt_port_id_t) -1) {
1096 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1097
1098 new_port.in_fd = recv_msg->fd;
1099 new_port.out_fd = -1;
1100
1101 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1102 MAP_SHARED, recv_msg->fd2, 0);
1103
1104 } else {
1105 nb = 0;
1106
1107 if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
1108 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
1109 "failed: %s (%d)",
1110 recv_msg->stream, recv_msg->fd, strerror(errno), errno);
1111
1112 return NXT_UNIT_ERROR;
1113 }
1114
1115 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1116 new_port_msg->id);
1117
1118 new_port.in_fd = -1;
1119 new_port.out_fd = recv_msg->fd;
1120
1121 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1122 MAP_SHARED, recv_msg->fd2, 0);
1123 }
1124
1125 if (nxt_slow_path(mem == MAP_FAILED)) {
1126 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2,
1127 strerror(errno), errno);
1128
1129 return NXT_UNIT_ERROR;
1130 }
1131
1132 new_port.data = NULL;
1133
1134 recv_msg->fd = -1;
1135
1136 port = nxt_unit_add_port(ctx, &new_port, mem);
1137 if (nxt_slow_path(port == NULL)) {
1138 return NXT_UNIT_ERROR;
1139 }
1140
1141 if (new_port_msg->id == (nxt_port_id_t) -1) {
1142 lib->shared_port = port;
1143
1144 } else {

--- 69 unchanged lines hidden (view full) ---

1214 recv_msg->incoming_buf = NULL;
1215
1216 req->content_fd = recv_msg->fd;
1217 recv_msg->fd = -1;
1218
1219 req->response_max_fields = 0;
1220 req_impl->state = NXT_UNIT_RS_START;
1221 req_impl->websocket = 0;
1222 req_impl->in_hash = 0;
1223
1224 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1225 (int) r->method_length,
1226 (char *) nxt_unit_sptr_get(&r->method),
1227 (int) r->target_length,
1228 (char *) nxt_unit_sptr_get(&r->target),
1229 (int) r->content_length);
1230
1231 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1232
1233 res = nxt_unit_request_check_response_port(req, &port_id);
1234 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1235 return NXT_UNIT_ERROR;
1236 }
1237
1238 if (nxt_fast_path(res == NXT_UNIT_OK)) {
1239 res = nxt_unit_send_req_headers_ack(req);
1240 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1241 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1242
1243 return NXT_UNIT_ERROR;
1244 }
1245
1246 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1247
1248 if (req->content_length
1249 > (uint64_t) (req->content_buf->end - req->content_buf->free))
1250 {
1251 res = nxt_unit_request_hash_add(ctx, req);
1252 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1253 nxt_unit_req_warn(req, "failed to add request to hash");
1254
1255 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1256
1257 return NXT_UNIT_ERROR;
1258 }
1259
1260 /*
1261 * If application have separate data handler, we may start
1262 * request processing and process data when it is arrived.
1263 */
1264 if (lib->callbacks.data_handler == NULL) {
1265 return NXT_UNIT_OK;
1266 }
1267 }
1268
1269 lib->callbacks.request_handler(req);
1270 }
1271
1272 return NXT_UNIT_OK;
1273}
1274
1275
1276static int
1277nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1278{
1279 uint64_t l;
1280 nxt_unit_impl_t *lib;
1281 nxt_unit_mmap_buf_t *b;
1282 nxt_unit_request_info_t *req;
1283
1284 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1285 if (req == NULL) {
1286 return NXT_UNIT_OK;
1287 }
1288
1289 l = req->content_buf->end - req->content_buf->free;
1290
1291 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1292 b->req = req;
1293 l += b->buf.end - b->buf.free;
1294 }
1295
1296 if (recv_msg->incoming_buf != NULL) {
1297 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1298
1299 /* "Move" incoming buffer list to req_impl. */
1300 nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf);
1301 recv_msg->incoming_buf = NULL;
1302 }
1303
1304 req->content_fd = recv_msg->fd;
1305 recv_msg->fd = -1;
1306
1307 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1308
1309 if (lib->callbacks.data_handler != NULL) {
1310 lib->callbacks.data_handler(req);
1311
1312 return NXT_UNIT_OK;
1313 }
1314
1315 if (req->content_fd != -1 || l == req->content_length) {
1316 lib->callbacks.request_handler(req);
1317 }
1318
1319 return NXT_UNIT_OK;
1320}
1321
1322
1323static int
1324nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1325 nxt_unit_port_id_t *port_id)
1326{
1327 int res;
1328 nxt_unit_ctx_t *ctx;
1329 nxt_unit_impl_t *lib;
1330 nxt_unit_port_t *port;
1331 nxt_unit_process_t *process;

--- 79 unchanged lines hidden (view full) ---

1411 free(port);
1412
1413 return NXT_UNIT_ERROR;
1414 }
1415
1416 nxt_queue_insert_tail(&process->ports, &port_impl->link);
1417
1418 port_impl->process = process;
1419 port_impl->queue = NULL;
1420 port_impl->from_socket = 0;
1421 port_impl->socket_rbuf = NULL;
1422
1423 nxt_queue_init(&port_impl->awaiting_req);
1424
1425 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1426
1427 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1428
1429 port_impl->use_count = 2;

--- 45 unchanged lines hidden (view full) ---

1475
1476
1477static int
1478nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1479{
1480 size_t hsize;
1481 nxt_unit_impl_t *lib;
1482 nxt_unit_mmap_buf_t *b;
1483 nxt_unit_callbacks_t *cb;
1484 nxt_unit_request_info_t *req;
1485 nxt_unit_request_info_impl_t *req_impl;
1486 nxt_unit_websocket_frame_impl_t *ws_impl;
1487
1488 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1489 if (nxt_slow_path(req == NULL)) {
1490 return NXT_UNIT_OK;
1491 }
1492
1493 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1494
1495 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1496 cb = &lib->callbacks;
1497
1498 if (cb->websocket_handler && recv_msg->size >= 2) {
1499 ws_impl = nxt_unit_websocket_frame_get(ctx);
1500 if (nxt_slow_path(ws_impl == NULL)) {
1501 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",

--- 149 unchanged lines hidden (view full) ---

1651 nxt_unit_request_info_impl_t *req_impl;
1652
1653 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1654 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1655
1656 req->response = NULL;
1657 req->response_buf = NULL;
1658
1659 if (req_impl->in_hash) {
1660 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1661 }
1662
1663 req_impl->websocket = 0;
1664
1665 while (req_impl->outgoing_buf != NULL) {
1666 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1667 }
1668
1669 while (req_impl->incoming_buf != NULL) {
1670 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1671 }
1672

--- 647 unchanged lines hidden (view full) ---

2320 return req->request->websocket_handshake;
2321}
2322
2323
2324int
2325nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2326{
2327 int rc;
2328 nxt_unit_request_info_impl_t *req_impl;
2329
2330 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2331
2332 if (nxt_slow_path(req_impl->websocket != 0)) {
2333 nxt_unit_req_debug(req, "upgrade: already upgraded");
2334
2335 return NXT_UNIT_OK;

--- 6 unchanged lines hidden (view full) ---

2342 }
2343
2344 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2345 nxt_unit_req_warn(req, "upgrade: response already sent");
2346
2347 return NXT_UNIT_ERROR;
2348 }
2349
2350 rc = nxt_unit_request_hash_add(req->ctx, req);
2351 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2352 nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2353
2354 return NXT_UNIT_ERROR;
2355 }
2356
2357 req_impl->websocket = 1;
2358

--- 254 unchanged lines hidden (view full) ---

2613 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2614
2615 pthread_mutex_lock(&ctx_impl->mutex);
2616
2617 rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2618
2619 pthread_mutex_unlock(&ctx_impl->mutex);
2620
2621 memset(rbuf->oob, 0, sizeof(struct cmsghdr));
2622
2623 return rbuf;
2624}
2625
2626
2627static nxt_unit_read_buf_t *
2628nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2629{
2630 nxt_queue_link_t *link;

--- 82 unchanged lines hidden (view full) ---

2713 int rc;
2714 ssize_t sent;
2715 uint32_t part_size, min_part_size, buf_size;
2716 const char *part_start;
2717 nxt_unit_mmap_buf_t mmap_buf;
2718 nxt_unit_request_info_impl_t *req_impl;
2719 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2720
2721 nxt_unit_req_debug(req, "write: %d", (int) size);
2722
2723 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2724
2725 part_start = start;
2726 sent = 0;
2727
2728 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2729 nxt_unit_req_alert(req, "write: response not initialized yet");
2730

--- 163 unchanged lines hidden (view full) ---

2894ssize_t
2895nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2896{
2897 ssize_t buf_res, res;
2898
2899 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
2900 dst, size);
2901
2902 nxt_unit_req_debug(req, "read: %d", (int) buf_res);
2903
2904 if (buf_res < (ssize_t) size && req->content_fd != -1) {
2905 res = read(req->content_fd, dst, size);
2906 if (nxt_slow_path(res < 0)) {
2907 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2908 strerror(errno), errno);
2909
2910 return res;
2911 }
2912
2913 if (res < (ssize_t) size) {
2914 close(req->content_fd);

--- 539 unchanged lines hidden (view full) ---

3454
3455 return NXT_UNIT_OK;
3456}
3457
3458
3459static int
3460nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3461{
3462 int res;
3463 nxt_unit_ctx_impl_t *ctx_impl;
3464 nxt_unit_read_buf_t *rbuf;
3465
3466 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3467
3468 while (1) {
3469 rbuf = nxt_unit_read_buf_get(ctx);
3470 if (nxt_slow_path(rbuf == NULL)) {
3471 return NXT_UNIT_ERROR;
3472 }
3473
3474 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
3475 if (res == NXT_UNIT_ERROR) {
3476 nxt_unit_read_buf_release(ctx, rbuf);
3477
3478 return NXT_UNIT_ERROR;
3479 }
3480
3481 if (nxt_unit_is_shm_ack(rbuf)) {
3482 nxt_unit_read_buf_release(ctx, rbuf);
3483 break;
3484 }
3485
3486 pthread_mutex_lock(&ctx_impl->mutex);
3487
3488 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3489
3490 pthread_mutex_unlock(&ctx_impl->mutex);
3491
3492 if (nxt_unit_is_quit(rbuf)) {
3493 nxt_unit_debug(ctx, "oosm: quit received");
3494
3495 return NXT_UNIT_ERROR;
3496 }
3497 }
3498
3499 return NXT_UNIT_OK;
3500}

--- 52 unchanged lines hidden (view full) ---

3553}
3554
3555
3556static nxt_port_mmap_header_t *
3557nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3558{
3559 int i, fd, rc;
3560 void *mem;
3561 nxt_unit_mmap_t *mm;
3562 nxt_unit_impl_t *lib;
3563 nxt_port_mmap_header_t *hdr;
3564
3565 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3566
3567 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
3568 if (nxt_slow_path(mm == NULL)) {
3569 nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3570
3571 return NULL;
3572 }
3573
3574 fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
3575 if (nxt_slow_path(fd == -1)) {
3576 goto remove_fail;
3577 }
3578
3579 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3580 if (nxt_slow_path(mem == MAP_FAILED)) {
3581 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3582 strerror(errno), errno);
3583
3584 close(fd);
3585
3586 goto remove_fail;
3587 }
3588
3589 mm->hdr = mem;
3590 hdr = mem;
3591
3592 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3593 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));

--- 36 unchanged lines hidden (view full) ---

3630
3631 lib->outgoing.size--;
3632
3633 return NULL;
3634}
3635
3636
3637static int
3638nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
3639{
3640 int fd;
3641 nxt_unit_impl_t *lib;
3642
3643 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3644
3645#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
3646 char name[64];
3647
3648 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3649 lib->pid, (void *) pthread_self());
3650#endif
3651
3652#if (NXT_HAVE_MEMFD_CREATE)
3653
3654 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3655 if (nxt_slow_path(fd == -1)) {
3656 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3657 strerror(errno), errno);
3658
3659 return -1;
3660 }
3661
3662 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3663
3664#elif (NXT_HAVE_SHM_OPEN_ANON)
3665
3666 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3667 if (nxt_slow_path(fd == -1)) {
3668 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3669 strerror(errno), errno);
3670
3671 return -1;
3672 }
3673
3674#elif (NXT_HAVE_SHM_OPEN)
3675
3676 /* Just in case. */
3677 shm_unlink(name);
3678
3679 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3680 if (nxt_slow_path(fd == -1)) {
3681 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3682 strerror(errno), errno);
3683
3684 return -1;
3685 }
3686
3687 if (nxt_slow_path(shm_unlink(name) == -1)) {
3688 nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3689 strerror(errno), errno);
3690 }
3691
3692#else
3693
3694#error No working shared memory implementation.
3695
3696#endif
3697
3698 if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3699 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3700 strerror(errno), errno);
3701
3702 close(fd);
3703
3704 return -1;
3705 }
3706
3707 return fd;
3708}
3709
3710
3711static int
3712nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3713{
3714 ssize_t res;
3715 nxt_port_msg_t msg;
3716 nxt_unit_impl_t *lib;
3717 union {
3718 struct cmsghdr cm;
3719 char space[CMSG_SPACE(sizeof(int))];

--- 248 unchanged lines hidden (view full) ---

3968 free(mmaps->elts);
3969 }
3970
3971 pthread_mutex_destroy(&mmaps->mutex);
3972}
3973
3974
3975static int
3976nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
3977 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
3978 nxt_unit_read_buf_t *rbuf)
3979{
3980 int res, need_rbuf;
3981 nxt_unit_mmap_t *mm;
3982 nxt_unit_ctx_impl_t *ctx_impl;
3983

--- 286 unchanged lines hidden (view full) ---

4270 process = malloc(sizeof(nxt_unit_process_t));
4271 if (nxt_slow_path(process == NULL)) {
4272 nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid);
4273
4274 return NULL;
4275 }
4276
4277 process->pid = pid;
4278 process->use_count = 2;
4279 process->next_port_id = 0;
4280 process->lib = lib;
4281
4282 nxt_queue_init(&process->ports);
4283
4284 lhq.replace = 0;
4285 lhq.value = process;
4286

--- 5 unchanged lines hidden (view full) ---

4292 default:
4293 nxt_unit_alert(NULL, "process %d insert failed", (int) pid);
4294
4295 free(process);
4296 process = NULL;
4297 break;
4298 }
4299
4300 return process;
4301}
4302
4303
4304static nxt_unit_process_t *
4305nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
4306{
4307 int rc;

--- 99 unchanged lines hidden (view full) ---

4407
4408 return rc;
4409}
4410
4411
4412static int
4413nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4414{
4415 int nevents, res, err;
4416 nxt_unit_impl_t *lib;
4417 nxt_unit_ctx_impl_t *ctx_impl;
4418 nxt_unit_port_impl_t *port_impl;
4419 struct pollfd fds[2];
4420
4421 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4422 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4423
4424 if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
4425
4426 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4427 }
4428
4429 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4430 port);
4431
4432retry:
4433
4434 if (port_impl->from_socket == 0) {
4435 res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
4436 if (res == NXT_UNIT_OK) {
4437 if (nxt_unit_is_read_socket(rbuf)) {
4438 port_impl->from_socket++;
4439
4440 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
4441 (int) ctx_impl->read_port->id.pid,
4442 (int) ctx_impl->read_port->id.id,
4443 port_impl->from_socket);
4444
4445 } else {
4446 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
4447 (int) ctx_impl->read_port->id.pid,
4448 (int) ctx_impl->read_port->id.id,
4449 (int) rbuf->size);
4450
4451 return NXT_UNIT_OK;
4452 }
4453 }
4454 }
4455
4456 res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
4457 if (res == NXT_UNIT_OK) {
4458 return NXT_UNIT_OK;
4459 }
4460
4461 fds[0].fd = ctx_impl->read_port->in_fd;
4462 fds[0].events = POLLIN;
4463 fds[0].revents = 0;
4464
4465 fds[1].fd = lib->shared_port->in_fd;
4466 fds[1].events = POLLIN;
4467 fds[1].revents = 0;
4468
4469 nevents = poll(fds, 2, -1);
4470 if (nxt_slow_path(nevents == -1)) {
4471 err = errno;
4472
4473 if (err == EINTR) {
4474 goto retry;
4475 }
4476
4477 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
4478 fds[0].fd, fds[1].fd, strerror(err), err);
4479
4480 rbuf->size = -1;
4481
4482 return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
4483 }
4484
4485 nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
4486 fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4487 fds[1].revents);
4488
4489 if ((fds[0].revents & POLLIN) != 0) {
4490 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4491 if (res == NXT_UNIT_AGAIN) {
4492 goto retry;
4493 }
4494
4495 return res;
4496 }
4497
4498 if ((fds[1].revents & POLLIN) != 0) {
4499 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4500 if (res == NXT_UNIT_AGAIN) {
4501 goto retry;
4502 }
4503
4504 return res;
4505 }
4506
4507 nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
4508 fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4509 fds[1].revents);
4510
4511 return NXT_UNIT_ERROR;
4512}
4513
4514
4515static int
4516nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4517{

--- 34 unchanged lines hidden (view full) ---

4552
4553 return rc;
4554}
4555
4556
4557static void
4558nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4559{
4560 int res;
4561 nxt_queue_t ready_req;
4562 nxt_unit_impl_t *lib;
4563 nxt_unit_ctx_impl_t *ctx_impl;
4564 nxt_unit_request_info_t *req;
4565 nxt_unit_request_info_impl_t *req_impl;
4566
4567 nxt_queue_init(&ready_req);
4568
4569 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4570
4571 pthread_mutex_lock(&ctx_impl->mutex);
4572

--- 8 unchanged lines hidden (view full) ---

4581
4582 pthread_mutex_unlock(&ctx_impl->mutex);
4583
4584 nxt_queue_each(req_impl, &ready_req,
4585 nxt_unit_request_info_impl_t, port_wait_link)
4586 {
4587 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4588
4589 req = &req_impl->req;
4590
4591 res = nxt_unit_send_req_headers_ack(req);
4592 if (nxt_slow_path(res != NXT_UNIT_OK)) {
4593 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4594
4595 continue;
4596 }
4597
4598 if (req->content_length
4599 > (uint64_t) (req->content_buf->end - req->content_buf->free))
4600 {
4601 res = nxt_unit_request_hash_add(ctx, req);
4602 if (nxt_slow_path(res != NXT_UNIT_OK)) {
4603 nxt_unit_req_warn(req, "failed to add request to hash");
4604
4605 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4606
4607 continue;
4608 }
4609
4610 /*
4611 * If application have separate data handler, we may start
4612 * request processing and process data when it is arrived.
4613 */
4614 if (lib->callbacks.data_handler == NULL) {
4615 continue;
4616 }
4617 }
4618
4619 lib->callbacks.request_handler(&req_impl->req);
4620
4621 } nxt_queue_loop;
4622}
4623
4624
4625int
4626nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
4627{
4628 int rc;
4629 nxt_unit_impl_t *lib;
4630 nxt_unit_read_buf_t *rbuf;
4631 nxt_unit_ctx_impl_t *ctx_impl;
4632
4633 nxt_unit_ctx_use(ctx);
4634
4635 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4636 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4637
4638 rc = NXT_UNIT_OK;
4639
4640 while (nxt_fast_path(lib->online)) {
4641 rbuf = nxt_unit_read_buf_get(ctx);
4642 if (nxt_slow_path(rbuf == NULL)) {
4643 rc = NXT_UNIT_ERROR;
4644 break;
4645 }
4646
4647 retry:
4648
4649 rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4650 if (rc == NXT_UNIT_AGAIN) {
4651 goto retry;
4652 }
4653
4654 rc = nxt_unit_process_msg(ctx, rbuf);
4655 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4656 break;
4657 }
4658
4659 rc = nxt_unit_process_pending_rbuf(ctx);
4660 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4661 break;
4662 }
4663
4664 nxt_unit_process_ready_req(ctx);
4665 }
4666
4667 nxt_unit_ctx_release(ctx);
4668
4669 return rc;
4670}
4671
4672
4673nxt_inline int
4674nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
4675{
4676 nxt_port_msg_t *port_msg;
4677
4678 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4679 port_msg = (nxt_port_msg_t *) rbuf->buf;
4680
4681 return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
4682 }
4683
4684 return 0;
4685}
4686
4687
4688nxt_inline int
4689nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
4690{
4691 if (nxt_fast_path(rbuf->size == 1)) {
4692 return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
4693 }
4694
4695 return 0;
4696}
4697
4698
4699nxt_inline int
4700nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
4701{
4702 nxt_port_msg_t *port_msg;
4703
4704 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4705 port_msg = (nxt_port_msg_t *) rbuf->buf;
4706
4707 return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
4708 }
4709
4710 return 0;
4711}
4712
4713
4714nxt_inline int
4715nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
4716{
4717 nxt_port_msg_t *port_msg;
4718
4719 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4720 port_msg = (nxt_port_msg_t *) rbuf->buf;
4721
4722 return port_msg->type == _NXT_PORT_MSG_QUIT;
4723 }
4724
4725 return 0;
4726}
4727
4728
4729int
4730nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4731{
4732 int rc;
4733 nxt_unit_impl_t *lib;
4734 nxt_unit_read_buf_t *rbuf;
4735
4736 nxt_unit_ctx_use(ctx);
4737
4738 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4739 rc = NXT_UNIT_OK;
4740
4741 while (nxt_fast_path(lib->online)) {
4742 rbuf = nxt_unit_read_buf_get(ctx);
4743 if (nxt_slow_path(rbuf == NULL)) {
4744 rc = NXT_UNIT_ERROR;
4745 break;
4746 }
4747
4748 retry:
4749
4750 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4751 if (rc == NXT_UNIT_AGAIN) {
4752 goto retry;
4753 }
4754
4755 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4756 nxt_unit_read_buf_release(ctx, rbuf);
4757 break;
4758 }
4759
4760 rc = nxt_unit_process_msg(ctx, rbuf);
4761 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4762 break;
4763 }
4764
4765 rc = nxt_unit_process_pending_rbuf(ctx);
4766 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4767 break;
4768 }
4769
4770 nxt_unit_process_ready_req(ctx);
4771 }
4772
4773 nxt_unit_ctx_release(ctx);
4774
4775 return rc;
4776}
4777
4778

--- 11 unchanged lines hidden (view full) ---

4790 return rc;
4791}
4792
4793
4794static int
4795nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4796{
4797 int rc;
4798 nxt_unit_impl_t *lib;
4799 nxt_unit_read_buf_t *rbuf;
4800
4801 rbuf = nxt_unit_read_buf_get(ctx);
4802 if (nxt_slow_path(rbuf == NULL)) {
4803 return NXT_UNIT_ERROR;
4804 }
4805
4806 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4807
4808retry:
4809
4810 if (port == lib->shared_port) {
4811 rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
4812
4813 } else {
4814 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
4815 }
4816
4817 if (rc != NXT_UNIT_OK) {
4818 nxt_unit_read_buf_release(ctx, rbuf);
4819 return rc;
4820 }
4821
4822 rc = nxt_unit_process_msg(ctx, rbuf);
4823 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4824 return NXT_UNIT_ERROR;
4825 }
4826
4827 rc = nxt_unit_process_pending_rbuf(ctx);
4828 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4829 return NXT_UNIT_ERROR;
4830 }
4831
4832 nxt_unit_process_ready_req(ctx);
4833
4834 rbuf = nxt_unit_read_buf_get(ctx);
4835 if (nxt_slow_path(rbuf == NULL)) {
4836 return NXT_UNIT_ERROR;
4837 }
4838
4839 if (lib->online) {
4840 goto retry;
4841 }
4842
4843 return rc;
4844}
4845
4846
4847void
4848nxt_unit_done(nxt_unit_ctx_t *ctx)
4849{
4850 nxt_unit_ctx_release(ctx);
4851}
4852
4853
4854nxt_unit_ctx_t *
4855nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
4856{
4857 int rc, queue_fd;
4858 void *mem;
4859 nxt_unit_impl_t *lib;
4860 nxt_unit_port_t *port;
4861 nxt_unit_ctx_impl_t *new_ctx;
4862 nxt_unit_port_impl_t *port_impl;
4863
4864 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4865
4866 new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
4867 if (nxt_slow_path(new_ctx == NULL)) {
4868 nxt_unit_alert(ctx, "failed to allocate context");
4869
4870 return NULL;
4871 }
4872
4873 rc = nxt_unit_ctx_init(lib, new_ctx, data);
4874 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4875 free(new_ctx);
4876
4877 return NULL;
4878 }
4879
4880 queue_fd = -1;
4881
4882 port = nxt_unit_create_port(ctx);
4883 if (nxt_slow_path(port == NULL)) {
4884 goto fail;
4885 }
4886
4887 new_ctx->read_port = port;
4888
4889 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
4890 if (nxt_slow_path(queue_fd == -1)) {
4891 goto fail;
4892 }
4893
4894 mem = mmap(NULL, sizeof(nxt_port_queue_t),
4895 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
4896 if (nxt_slow_path(mem == MAP_FAILED)) {
4897 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
4898 strerror(errno), errno);
4899
4900 goto fail;
4901 }
4902
4903 nxt_port_queue_init(mem);
4904
4905 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
4906 port_impl->queue = mem;
4907
4908 rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
4909 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4910 goto fail;
4911 }
4912
4913 close(queue_fd);
4914
4915 return &new_ctx->ctx;
4916
4917fail:
4918
4919 if (queue_fd != -1) {
4920 close(queue_fd);
4921 }
4922
4923 nxt_unit_ctx_release(&new_ctx->ctx);
4924
4925 return NULL;
4926}
4927
4928
4929static void
4930nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
4931{

--- 36 unchanged lines hidden (view full) ---

4968
4969 } nxt_queue_loop;
4970
4971 pthread_mutex_destroy(&ctx_impl->mutex);
4972
4973 nxt_queue_remove(&ctx_impl->link);
4974
4975 if (nxt_fast_path(ctx_impl->read_port != NULL)) {
4976 nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
4977 nxt_unit_port_release(ctx_impl->read_port);
4978 }
4979
4980 if (ctx_impl != &lib->main_ctx) {
4981 free(ctx_impl);
4982 }
4983
4984 nxt_unit_lib_release(lib);

--- 60 unchanged lines hidden (view full) ---

5045 new_port.in_fd = port_sockets[0];
5046 new_port.out_fd = port_sockets[1];
5047 new_port.data = NULL;
5048
5049 pthread_mutex_unlock(&lib->mutex);
5050
5051 nxt_unit_process_release(process);
5052
5053 port = nxt_unit_add_port(ctx, &new_port, NULL);
5054 if (nxt_slow_path(port == NULL)) {
5055 close(port_sockets[0]);
5056 close(port_sockets[1]);
5057 }
5058
5059 return port;
5060}
5061
5062
5063static int
5064nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
5065 nxt_unit_port_t *port, int queue_fd)
5066{
5067 ssize_t res;
5068 nxt_unit_impl_t *lib;
5069 int fds[2] = { port->out_fd, queue_fd };
5070
5071 struct {
5072 nxt_port_msg_t msg;
5073 nxt_port_msg_new_port_t new_port;
5074 } m;
5075
5076 union {
5077 struct cmsghdr cm;
5078 char space[CMSG_SPACE(sizeof(int) * 2)];
5079 } cmsg;
5080
5081 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5082
5083 m.msg.stream = 0;
5084 m.msg.pid = lib->pid;
5085 m.msg.reply_port = 0;
5086 m.msg.type = _NXT_PORT_MSG_NEW_PORT;

--- 6 unchanged lines hidden (view full) ---

5093 m.new_port.id = port->id.id;
5094 m.new_port.pid = port->id.pid;
5095 m.new_port.type = NXT_PROCESS_APP;
5096 m.new_port.max_size = 16 * 1024;
5097 m.new_port.max_share = 64 * 1024;
5098
5099 memset(&cmsg, 0, sizeof(cmsg));
5100
5101 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
5102 cmsg.cm.cmsg_level = SOL_SOCKET;
5103 cmsg.cm.cmsg_type = SCM_RIGHTS;
5104
5105 /*
5106 * memcpy() is used instead of simple
5107 * *(int *) CMSG_DATA(&cmsg.cm) = fd;
5108 * because GCC 4.4 with -O2/3/s optimization may issue a warning:
5109 * dereferencing type-punned pointer will break strict-aliasing rules
5110 *
5111 * Fortunately, GCC with -O1 compiles this nxt_memcpy()
5112 * in the same simple assignment as in the code above.
5113 */
5114 memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
5115
5116 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
5117
5118 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
5119}
5120
5121
5122nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)

--- 11 unchanged lines hidden (view full) ---

5134 long c;
5135 nxt_unit_port_impl_t *port_impl;
5136
5137 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5138
5139 c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5140
5141 if (c == 1) {
5142 nxt_unit_debug(NULL, "destroy port{%d,%d}",
5143 (int) port->id.pid, (int) port->id.id);
5144
5145 nxt_unit_process_release(port_impl->process);
5146
5147 if (port->in_fd != -1) {
5148 close(port->in_fd);
5149
5150 port->in_fd = -1;
5151 }
5152
5153 if (port->out_fd != -1) {
5154 close(port->out_fd);
5155
5156 port->out_fd = -1;
5157 }
5158
5159 if (port->in_fd != -1) {
5160 close(port->in_fd);
5161
5162 port->in_fd = -1;
5163 }
5164
5165 if (port->out_fd != -1) {
5166 close(port->out_fd);
5167
5168 port->out_fd = -1;
5169 }
5170
5171 if (port_impl->queue != NULL) {
5172 munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
5173 ? sizeof(nxt_app_queue_t)
5174 : sizeof(nxt_port_queue_t));
5175 }
5176
5177 free(port_impl);
5178 }
5179}
5180
5181
5182static nxt_unit_port_t *
5183nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
5184{
5185 int rc;
5186 nxt_queue_t awaiting_req;
5187 nxt_unit_impl_t *lib;
5188 nxt_unit_port_t *old_port;
5189 nxt_unit_process_t *process;
5190 nxt_unit_ctx_impl_t *ctx_impl;
5191 nxt_unit_port_impl_t *new_port, *old_port_impl;
5192 nxt_unit_request_info_impl_t *req_impl;
5193
5194 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5195
5196 pthread_mutex_lock(&lib->mutex);
5197
5198 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
5199
5200 if (nxt_slow_path(old_port != NULL)) {
5201 nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
5202 "in_fd %d out_fd %d queue %p",
5203 port->id.pid, port->id.id,
5204 port->in_fd, port->out_fd, queue);
5205
5206 if (old_port->data == NULL) {
5207 old_port->data = port->data;
5208 port->data = NULL;
5209 }
5210
5211 if (old_port->in_fd == -1) {
5212 old_port->in_fd = port->in_fd;

--- 16 unchanged lines hidden (view full) ---

5229 }
5230
5231 *port = *old_port;
5232
5233 nxt_queue_init(&awaiting_req);
5234
5235 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
5236
5237 if (old_port_impl->queue == NULL) {
5238 old_port_impl->queue = queue;
5239 }
5240
5241 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5242 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5243 nxt_queue_init(&old_port_impl->awaiting_req);
5244 }
5245
5246 old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
5247
5248 pthread_mutex_unlock(&lib->mutex);

--- 23 unchanged lines hidden (view full) ---

5272
5273 } nxt_queue_loop;
5274
5275 return old_port;
5276 }
5277
5278 new_port = NULL;
5279
5280 nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
5281 port->id.pid, port->id.id,
5282 port->in_fd, port->out_fd, queue);
5283
5284 process = nxt_unit_process_get(lib, port->id.pid);
5285 if (nxt_slow_path(process == NULL)) {
5286 goto unlock;
5287 }
5288
5289 if (port->id.id >= process->next_port_id) {
5290 process->next_port_id = port->id.id + 1;
5291 }
5292
5293 new_port = malloc(sizeof(nxt_unit_port_impl_t));
5294 if (nxt_slow_path(new_port == NULL)) {
5295 nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
5296 port->id.pid, port->id.id);
5297
5298 goto unlock;
5299 }
5300
5301 new_port->port = *port;
5302
5303 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
5304 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5305 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",

--- 6 unchanged lines hidden (view full) ---

5312 goto unlock;
5313 }
5314
5315 nxt_queue_insert_tail(&process->ports, &new_port->link);
5316
5317 new_port->use_count = 2;
5318 new_port->process = process;
5319 new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
5320 new_port->queue = queue;
5321 new_port->from_socket = 0;
5322 new_port->socket_rbuf = NULL;
5323
5324 nxt_queue_init(&new_port->awaiting_req);
5325
5326 process = NULL;
5327
5328unlock:
5329
5330 pthread_mutex_unlock(&lib->mutex);

--- 43 unchanged lines hidden (view full) ---

5374
5375static nxt_unit_port_t *
5376nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5377{
5378 nxt_unit_port_t *port;
5379
5380 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
5381 if (nxt_slow_path(port == NULL)) {
5382 nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
5383 (int) port_id->pid, (int) port_id->id);
5384
5385 return NULL;
5386 }
5387
5388 nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
5389 (int) port_id->pid, (int) port_id->id,
5390 port->in_fd, port->out_fd, port->data);
5391
5392 return port;
5393}
5394
5395
5396static void

--- 56 unchanged lines hidden (view full) ---

5453
5454static void
5455nxt_unit_quit(nxt_unit_ctx_t *ctx)
5456{
5457 nxt_unit_impl_t *lib;
5458
5459 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5460
5461 if (lib->online) {
5462 lib->online = 0;
5463
5464 if (lib->callbacks.quit != NULL) {
5465 lib->callbacks.quit(ctx);
5466 }
5467 }
5468}
5469
5470
5471static int
5472nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
5473{
5474 ssize_t res;

--- 28 unchanged lines hidden (view full) ---

5503 return NXT_UNIT_OK;
5504}
5505
5506
5507static ssize_t
5508nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5509 const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5510{
5511 int notify;
5512 ssize_t ret;
5513 nxt_int_t rc;
5514 nxt_port_msg_t msg;
5515 nxt_unit_impl_t *lib;
5516 nxt_unit_port_impl_t *port_impl;
5517
5518 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5519
5520 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5521 if (port_impl->queue != NULL && oob_size == 0
5522 && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
5523 {
5524 rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
5525 if (nxt_slow_path(rc != NXT_OK)) {
5526 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5527 (int) port->id.pid, (int) port->id.id);
5528
5529 return -1;
5530 }
5531
5532 nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
5533 (int) port->id.pid, (int) port->id.id,
5534 (int) buf_size, notify);
5535
5536 if (notify) {
5537 memcpy(&msg, buf, sizeof(nxt_port_msg_t));
5538
5539 msg.type = _NXT_PORT_MSG_READ_QUEUE;
5540
5541 if (lib->callbacks.port_send == NULL) {
5542 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
5543 sizeof(nxt_port_msg_t), NULL, 0);
5544
5545 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
5546 (int) port->id.pid, (int) port->id.id,
5547 (int) ret);
5548
5549 } else {
5550 ret = lib->callbacks.port_send(ctx, port, &msg,
5551 sizeof(nxt_port_msg_t), NULL, 0);
5552
5553 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
5554 (int) port->id.pid, (int) port->id.id,
5555 (int) ret);
5556 }
5557
5558 }
5559
5560 return buf_size;
5561 }
5562
5563 if (port_impl->queue != NULL) {
5564 msg.type = _NXT_PORT_MSG_READ_SOCKET;
5565
5566 rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
5567 if (nxt_slow_path(rc != NXT_OK)) {
5568 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5569 (int) port->id.pid, (int) port->id.id);
5570
5571 return -1;
5572 }
5573
5574 nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
5575 (int) port->id.pid, (int) port->id.id, notify);
5576 }
5577
5578 if (lib->callbacks.port_send != NULL) {
5579 ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
5580 oob, oob_size);
5581
5582 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
5583 (int) port->id.pid, (int) port->id.id,
5584 (int) ret);
5585
5586 } else {
5587 ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
5588 oob, oob_size);
5589
5590 nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
5591 (int) port->id.pid, (int) port->id.id,
5592 (int) ret);
5593 }
5594
5595 return ret;
5596}
5597
5598
5599static ssize_t
5600nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
5601 const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5602{
5603 int err;
5604 ssize_t res;
5605 struct iovec iov[1];
5606 struct msghdr msg;
5607
5608 iov[0].iov_base = (void *) buf;
5609 iov[0].iov_len = buf_size;
5610
5611 msg.msg_name = NULL;

--- 4 unchanged lines hidden (view full) ---

5616 msg.msg_control = (void *) oob;
5617 msg.msg_controllen = oob_size;
5618
5619retry:
5620
5621 res = sendmsg(fd, &msg, 0);
5622
5623 if (nxt_slow_path(res == -1)) {
5624 err = errno;
5625
5626 if (err == EINTR) {
5627 goto retry;
5628 }
5629
5630 /*
5631 * FIXME: This should be "alert" after router graceful shutdown
5632 * implementation.
5633 */
5634 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
5635 fd, (int) buf_size, strerror(err), err);
5636
5637 } else {
5638 nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
5639 (int) res);
5640 }
5641
5642 return res;
5643}
5644
5645
5646static int
5647nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5648 nxt_unit_read_buf_t *rbuf)
5649{
5650 int res, read;
5651 nxt_unit_port_impl_t *port_impl;
5652
5653 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5654
5655 read = 0;
5656
5657retry:
5658
5659 if (port_impl->from_socket > 0) {
5660 if (port_impl->socket_rbuf != NULL
5661 && port_impl->socket_rbuf->size > 0)
5662 {
5663 port_impl->from_socket--;
5664
5665 nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
5666 port_impl->socket_rbuf->size = 0;
5667
5668 nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
5669 (int) port->id.pid, (int) port->id.id,
5670 (int) rbuf->size);
5671
5672 return NXT_UNIT_OK;
5673 }
5674
5675 } else {
5676 res = nxt_unit_port_queue_recv(port, rbuf);
5677
5678 if (res == NXT_UNIT_OK) {
5679 if (nxt_unit_is_read_socket(rbuf)) {
5680 port_impl->from_socket++;
5681
5682 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
5683 (int) port->id.pid, (int) port->id.id,
5684 port_impl->from_socket);
5685
5686 goto retry;
5687 }
5688
5689 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
5690 (int) port->id.pid, (int) port->id.id,
5691 (int) rbuf->size);
5692
5693 return NXT_UNIT_OK;
5694 }
5695 }
5696
5697 if (read) {
5698 return NXT_UNIT_AGAIN;
5699 }
5700
5701 res = nxt_unit_port_recv(ctx, port, rbuf);
5702 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
5703 return NXT_UNIT_ERROR;
5704 }
5705
5706 read = 1;
5707
5708 if (nxt_unit_is_read_queue(rbuf)) {
5709 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
5710 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
5711
5712 if (port_impl->from_socket) {
5713 nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET");
5714 }
5715
5716 goto retry;
5717 }
5718
5719 nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
5720 (int) port->id.pid, (int) port->id.id,
5721 (int) rbuf->size);
5722
5723 if (res == NXT_UNIT_AGAIN) {
5724 return NXT_UNIT_AGAIN;
5725 }
5726
5727 if (port_impl->from_socket > 0) {
5728 port_impl->from_socket--;
5729
5730 return NXT_UNIT_OK;
5731 }
5732
5733 nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
5734 (int) port->id.pid, (int) port->id.id,
5735 (int) rbuf->size);
5736
5737 if (port_impl->socket_rbuf == NULL) {
5738 port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
5739
5740 if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
5741 return NXT_UNIT_ERROR;
5742 }
5743
5744 port_impl->socket_rbuf->size = 0;
5745 }
5746
5747 if (port_impl->socket_rbuf->size > 0) {
5748 nxt_unit_alert(ctx, "too many port socket messages");
5749
5750 return NXT_UNIT_ERROR;
5751 }
5752
5753 nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
5754
5755 memset(rbuf->oob, 0, sizeof(struct cmsghdr));
5756
5757 goto retry;
5758}
5759
5760
5761nxt_inline void
5762nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
5763{
5764 memcpy(dst->buf, src->buf, src->size);
5765 dst->size = src->size;
5766 memcpy(dst->oob, src->oob, sizeof(src->oob));
5767}
5768
5769
5770static int
5771nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5772 nxt_unit_read_buf_t *rbuf)
5773{
5774 int res;
5775
5776retry:
5777
5778 res = nxt_unit_app_queue_recv(port, rbuf);
5779
5780 if (res == NXT_UNIT_AGAIN) {
5781 res = nxt_unit_port_recv(ctx, port, rbuf);
5782 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
5783 return NXT_UNIT_ERROR;
5784 }
5785
5786 if (nxt_unit_is_read_queue(rbuf)) {
5787 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
5788 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
5789
5790 goto retry;
5791 }
5792 }
5793
5794 return res;
5795}
5796
5797
5798static int
5799nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5800 nxt_unit_read_buf_t *rbuf)
5801{
5802 int fd, err;
5803 struct iovec iov[1];
5804 struct msghdr msg;
5805 nxt_unit_impl_t *lib;
5806
5807 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5808
5809 if (lib->callbacks.port_recv != NULL) {
5810 rbuf->size = lib->callbacks.port_recv(ctx, port,
5811 rbuf->buf, sizeof(rbuf->buf),
5812 rbuf->oob, sizeof(rbuf->oob));
5813
5814 nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
5815 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
5816
5817 if (nxt_slow_path(rbuf->size < 0)) {
5818 return NXT_UNIT_ERROR;
5819 }
5820
5821 return NXT_UNIT_OK;
5822 }
5823
5824 iov[0].iov_base = rbuf->buf;

--- 17 unchanged lines hidden (view full) ---

5842 err = errno;
5843
5844 if (err == EINTR) {
5845 goto retry;
5846 }
5847
5848 if (err == EAGAIN) {
5849 nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
5850 fd, strerror(err), err);
5851
5852 return NXT_UNIT_AGAIN;
5853 }
5854
5855 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
5856 fd, strerror(err), err);
5857
5858 return NXT_UNIT_ERROR;
5859 }
5860
5861 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
5862
5863 return NXT_UNIT_OK;
5864}
5865
5866
5867static int
5868nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
5869{
5870 nxt_unit_port_impl_t *port_impl;
5871
5872 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5873
5874 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
5875
5876 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
5877}
5878
5879
5880static int
5881nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
5882{
5883 uint32_t cookie;
5884 nxt_port_msg_t *port_msg;
5885 nxt_app_queue_t *queue;
5886 nxt_unit_port_impl_t *port_impl;
5887
5888 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5889 queue = port_impl->queue;
5890
5891retry:
5892
5893 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
5894
5895 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
5896
5897 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
5898 port_msg = (nxt_port_msg_t *) rbuf->buf;
5899
5900 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
5901 return NXT_UNIT_OK;
5902 }
5903
5904 nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
5905
5906 goto retry;
5907 }
5908
5909 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
5910}
5911
5912
5913static nxt_int_t
5914nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
5915{
5916 nxt_unit_port_t *port;
5917 nxt_unit_port_hash_id_t *port_id;
5918
5919 port = data;
5920 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;

--- 112 unchanged lines hidden (view full) ---

6033 NXT_LVLHSH_DEFAULT,
6034 nxt_unit_request_hash_test,
6035 nxt_lvlhsh_alloc,
6036 nxt_lvlhsh_free,
6037};
6038
6039
6040static int
6041nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
6042 nxt_unit_request_info_t *req)
6043{
6044 uint32_t *stream;
6045 nxt_int_t res;
6046 nxt_lvlhsh_query_t lhq;
6047 nxt_unit_ctx_impl_t *ctx_impl;
6048 nxt_unit_request_info_impl_t *req_impl;
6049
6050 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6051 if (req_impl->in_hash) {
6052 return NXT_UNIT_OK;
6053 }
6054
6055 stream = &req_impl->stream;
6056
6057 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
6058 lhq.key.length = sizeof(*stream);
6059 lhq.key.start = (u_char *) stream;
6060 lhq.proto = &lvlhsh_requests_proto;
6061 lhq.pool = NULL;
6062 lhq.replace = 0;
6063 lhq.value = req_impl;
6064
6065 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6066
6067 pthread_mutex_lock(&ctx_impl->mutex);
6068
6069 res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
6070
6071 pthread_mutex_unlock(&ctx_impl->mutex);
6072
6073 switch (res) {
6074
6075 case NXT_OK:
6076 req_impl->in_hash = 1;
6077 return NXT_UNIT_OK;
6078
6079 default:
6080 return NXT_UNIT_ERROR;
6081 }
6082}
6083
6084
6085static nxt_unit_request_info_t *
6086nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
6087{
6088 nxt_int_t res;
6089 nxt_lvlhsh_query_t lhq;
6090 nxt_unit_ctx_impl_t *ctx_impl;
6091 nxt_unit_request_info_impl_t *req_impl;
6092
6093 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
6094 lhq.key.length = sizeof(stream);
6095 lhq.key.start = (u_char *) &stream;
6096 lhq.proto = &lvlhsh_requests_proto;
6097 lhq.pool = NULL;
6098
6099 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6100
6101 pthread_mutex_lock(&ctx_impl->mutex);
6102
6103 if (remove) {
6104 res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
6105
6106 } else {
6107 res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
6108 }
6109
6110 pthread_mutex_unlock(&ctx_impl->mutex);
6111
6112 switch (res) {
6113
6114 case NXT_OK:
6115 req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
6116 req);
6117 req_impl->in_hash = 0;
6118
6119 return lhq.value;
6120
6121 default:
6122 return NULL;
6123 }
6124}
6125
6126

--- 165 unchanged lines hidden ---