9a10,11
> #include "nxt_port_queue.h"
> #include "nxt_app_queue.h"
53c55,56
< static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
---
> static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
> int queue_fd);
58a62,63
> static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
> nxt_unit_recv_msg_t *recv_msg);
94a100
> static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
106,107d111
< static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
< nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
126a131,134
> nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
> nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
> nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
> nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
133c141
< nxt_unit_port_t *port);
---
> nxt_unit_port_t *port, int queue_fd);
138c146
< nxt_unit_port_t *port);
---
> nxt_unit_port_t *port, void *queue);
152a161,166
> static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
> nxt_unit_read_buf_t *rbuf);
> nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
> nxt_unit_read_buf_t *src);
> static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
> nxt_unit_read_buf_t *rbuf);
154a169,172
> static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
> nxt_unit_read_buf_t *rbuf);
> static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
> nxt_unit_read_buf_t *rbuf);
161,164c179,182
< static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
< nxt_unit_request_info_impl_t *req_impl);
< static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
< nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
---
> static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
> nxt_unit_request_info_t *req);
> static nxt_unit_request_info_t *nxt_unit_request_hash_find(
> nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
219a238
> uint8_t in_hash;
351a371,375
>
> void *queue;
>
> int from_socket;
> nxt_unit_read_buf_t *socket_rbuf;
378c402,403
< int rc;
---
> int rc, queue_fd;
> void *mem;
388a414,415
> queue_fd = -1;
>
425c452
< lib->router_port = nxt_unit_add_port(ctx, &router_port);
---
> lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
432c459,475
< lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port);
---
> queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
> if (nxt_slow_path(queue_fd == -1)) {
> goto fail;
> }
>
> mem = mmap(NULL, sizeof(nxt_port_queue_t),
> PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
> if (nxt_slow_path(mem == MAP_FAILED)) {
> nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
> strerror(errno), errno);
>
> goto fail;
> }
>
> nxt_port_queue_init(mem);
>
> lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
435a479,480
> munmap(mem, sizeof(nxt_port_queue_t));
>
439c484
< rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream);
---
> rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
442a488,489
> munmap(mem, sizeof(nxt_port_queue_t));
>
446a494
> close(queue_fd);
451a500,503
> if (queue_fd != -1) {
> close(queue_fd);
> }
>
499a552
> pthread_mutex_destroy(&lib->mutex);
507a561
> pthread_mutex_destroy(&lib->mutex);
768c822
< nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
---
> nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
773a828,832
> union {
> struct cmsghdr cm;
> char space[CMSG_SPACE(sizeof(int))];
> } cmsg;
>
786c845,863
< res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0);
---
> memset(&cmsg, 0, sizeof(cmsg));
>
> cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
> cmsg.cm.cmsg_level = SOL_SOCKET;
> cmsg.cm.cmsg_type = SCM_RIGHTS;
>
> /*
> * memcpy() is used instead of simple
> * *(int *) CMSG_DATA(&cmsg.cm) = fd;
> * because GCC 4.4 with -O2/3/s optimization may issue a warning:
> * dereferencing type-punned pointer will break strict-aliasing rules
> *
> * Fortunately, GCC with -O1 compiles this nxt_memcpy()
> * in the same simple assignment as in the code above.
> */
> memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
>
> res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
> &cmsg, sizeof(cmsg));
840a918,921
> nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d",
> port_msg->stream, (int) port_msg->type,
> recv_msg.fd, recv_msg.fd2);
>
856,868d936
< if (port_msg->tracking) {
< rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf);
<
< if (nxt_slow_path(rc != NXT_UNIT_OK)) {
< if (rc == NXT_UNIT_AGAIN) {
< recv_msg.fd = -1;
< recv_msg.fd2 = -1;
< }
<
< goto fail;
< }
< }
<
931a1000,1003
> case _NXT_PORT_MSG_REQ_BODY:
> rc = nxt_unit_process_req_body(ctx, &recv_msg);
> break;
>
994a1067
> void *mem;
1016c1089
< nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
---
> nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d",
1018c1091
< (int) new_port_msg->id, recv_msg->fd);
---
> (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2);
1027a1101,1103
> mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
> MAP_SHARED, recv_msg->fd2, 0);
>
1043a1120,1122
>
> mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
> MAP_SHARED, recv_msg->fd2, 0);
1045a1125,1127
> if (nxt_slow_path(mem == MAP_FAILED)) {
> nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2,
> strerror(errno), errno);
1046a1129,1131
> return NXT_UNIT_ERROR;
> }
>
1051c1136
< port = nxt_unit_add_port(ctx, &new_port);
---
> port = nxt_unit_add_port(ctx, &new_port, mem);
1136a1222
> req_impl->in_hash = 0;
1154,1155c1240,1243
< if (nxt_slow_path(res != NXT_UNIT_OK)) {
< return res;
---
> if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
> nxt_unit_request_done(req, NXT_UNIT_ERROR);
>
> return NXT_UNIT_ERROR;
1159a1248,1268
> if (req->content_length
> > (uint64_t) (req->content_buf->end - req->content_buf->free))
> {
> res = nxt_unit_request_hash_add(ctx, req);
> if (nxt_slow_path(res != NXT_UNIT_OK)) {
> nxt_unit_req_warn(req, "failed to add request to hash");
>
> nxt_unit_request_done(req, NXT_UNIT_ERROR);
>
> return NXT_UNIT_ERROR;
> }
>
> /*
> * If application have separate data handler, we may start
> * request processing and process data when it is arrived.
> */
> if (lib->callbacks.data_handler == NULL) {
> return NXT_UNIT_OK;
> }
> }
>
1167a1277,1323
> nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
> {
> uint64_t l;
> nxt_unit_impl_t *lib;
> nxt_unit_mmap_buf_t *b;
> nxt_unit_request_info_t *req;
>
> req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
> if (req == NULL) {
> return NXT_UNIT_OK;
> }
>
> l = req->content_buf->end - req->content_buf->free;
>
> for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
> b->req = req;
> l += b->buf.end - b->buf.free;
> }
>
> if (recv_msg->incoming_buf != NULL) {
> b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
>
> /* "Move" incoming buffer list to req_impl. */
> nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf);
> recv_msg->incoming_buf = NULL;
> }
>
> req->content_fd = recv_msg->fd;
> recv_msg->fd = -1;
>
> lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
>
> if (lib->callbacks.data_handler != NULL) {
> lib->callbacks.data_handler(req);
>
> return NXT_UNIT_OK;
> }
>
> if (req->content_fd != -1 || l == req->content_length) {
> lib->callbacks.request_handler(req);
> }
>
> return NXT_UNIT_OK;
> }
>
>
> static int
1262a1419,1421
> port_impl->queue = NULL;
> port_impl->from_socket = 0;
> port_impl->socket_rbuf = NULL;
1324d1482
< nxt_unit_ctx_impl_t *ctx_impl;
1330,1334c1488,1489
< ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
<
< req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
< recv_msg->last);
< if (req_impl == NULL) {
---
> req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
> if (nxt_slow_path(req == NULL)) {
1338c1493
< req = &req_impl->req;
---
> req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1504,1507c1659,1660
< if (req_impl->websocket) {
< nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
<
< req_impl->websocket = 0;
---
> if (req_impl->in_hash) {
> nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1509a1663,1664
> req_impl->websocket = 0;
>
2173d2327
< nxt_unit_ctx_impl_t *ctx_impl;
2196,2198c2350
< ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
<
< rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
---
> rc = nxt_unit_request_hash_add(req->ctx, req);
2468a2621,2622
> memset(rbuf->oob, 0, sizeof(struct cmsghdr));
>
2566a2721,2722
> nxt_unit_req_debug(req, "write: %d", (int) size);
>
2745a2902,2903
> nxt_unit_req_debug(req, "read: %d", (int) buf_res);
>
2748c2906
< if (res < 0) {
---
> if (nxt_slow_path(res < 0)) {
3304c3462
< nxt_port_msg_t *port_msg;
---
> int res;
3316,3320c3474,3475
< memset(rbuf->oob, 0, sizeof(struct cmsghdr));
<
< nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
<
< if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
---
> res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
> if (res == NXT_UNIT_ERROR) {
3326,3328c3481
< port_msg = (nxt_port_msg_t *) rbuf->buf;
<
< if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
---
> if (nxt_unit_is_shm_ack(rbuf)) {
3330d3482
<
3340c3492
< if (port_msg->type == _NXT_PORT_MSG_QUIT) {
---
> if (nxt_unit_is_quit(rbuf)) {
3409d3560
< char name[64];
3423,3428c3574
< snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
< lib->pid, (void *) pthread_self());
<
< #if (NXT_HAVE_MEMFD_CREATE)
<
< fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
---
> fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
3430,3432d3575
< nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
< strerror(errno), errno);
<
3436,3478d3578
< nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
<
< #elif (NXT_HAVE_SHM_OPEN_ANON)
<
< fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
< if (nxt_slow_path(fd == -1)) {
< nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
< strerror(errno), errno);
<
< goto remove_fail;
< }
<
< #elif (NXT_HAVE_SHM_OPEN)
<
< /* Just in case. */
< shm_unlink(name);
<
< fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
< if (nxt_slow_path(fd == -1)) {
< nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
< strerror(errno), errno);
<
< goto remove_fail;
< }
<
< if (nxt_slow_path(shm_unlink(name) == -1)) {
< nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
< strerror(errno), errno);
< }
<
< #else
<
< #error No working shared memory implementation.
<
< #endif
<
< if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
< nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
< strerror(errno), errno);
<
< goto remove_fail;
< }
<
3483a3584,3585
> close(fd);
>
3535a3638,3711
> nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
> {
> int fd;
> nxt_unit_impl_t *lib;
>
> lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
>
> #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
> char name[64];
>
> snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
> lib->pid, (void *) pthread_self());
> #endif
>
> #if (NXT_HAVE_MEMFD_CREATE)
>
> fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
> if (nxt_slow_path(fd == -1)) {
> nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
> strerror(errno), errno);
>
> return -1;
> }
>
> nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
>
> #elif (NXT_HAVE_SHM_OPEN_ANON)
>
> fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
> if (nxt_slow_path(fd == -1)) {
> nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
> strerror(errno), errno);
>
> return -1;
> }
>
> #elif (NXT_HAVE_SHM_OPEN)
>
> /* Just in case. */
> shm_unlink(name);
>
> fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
> if (nxt_slow_path(fd == -1)) {
> nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
> strerror(errno), errno);
>
> return -1;
> }
>
> if (nxt_slow_path(shm_unlink(name) == -1)) {
> nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
> strerror(errno), errno);
> }
>
> #else
>
> #error No working shared memory implementation.
>
> #endif
>
> if (nxt_slow_path(ftruncate(fd, size) == -1)) {
> nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
> strerror(errno), errno);
>
> close(fd);
>
> return -1;
> }
>
> return fd;
> }
>
>
> static int
3800,3854d3975
< nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
< nxt_unit_read_buf_t *rbuf)
< {
< int res;
< nxt_chunk_id_t c;
< nxt_unit_impl_t *lib;
< nxt_port_mmap_header_t *hdr;
< nxt_port_mmap_tracking_msg_t *tracking_msg;
<
< if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
< nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
< recv_msg->stream, (int) recv_msg->size);
<
< return NXT_UNIT_ERROR;
< }
<
< tracking_msg = recv_msg->start;
<
< recv_msg->start = tracking_msg + 1;
< recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
<
< lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
<
< pthread_mutex_lock(&lib->incoming.mutex);
<
< res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming,
< recv_msg->pid, tracking_msg->mmap_id,
< &hdr, rbuf);
<
< if (nxt_slow_path(res != NXT_UNIT_OK)) {
< return res;
< }
<
< c = tracking_msg->tracking_id;
< res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
<
< if (res == 0) {
< nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
< recv_msg->stream);
<
< nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
<
< res = NXT_UNIT_CANCELLED;
<
< } else {
< res = NXT_UNIT_OK;
< }
<
< pthread_mutex_unlock(&lib->incoming.mutex);
<
< return res;
< }
<
<
< static int
4157c4278
< process->use_count = 1;
---
> process->use_count = 2;
4179,4180d4299
< nxt_unit_process_use(process);
<
4296,4299c4415,4419
< int res, err;
< nxt_unit_impl_t *lib;
< nxt_unit_ctx_impl_t *ctx_impl;
< struct pollfd fds[2];
---
> int nevents, res, err;
> nxt_unit_impl_t *lib;
> nxt_unit_ctx_impl_t *ctx_impl;
> nxt_unit_port_impl_t *port_impl;
> struct pollfd fds[2];
4304,4305d4423
< memset(rbuf->oob, 0, sizeof(struct cmsghdr));
<
4307c4425,4426
< return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
---
>
> return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4309a4429,4431
> port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
> port);
>
4311a4434,4460
> if (port_impl->from_socket == 0) {
> res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
> if (res == NXT_UNIT_OK) {
> if (nxt_unit_is_read_socket(rbuf)) {
> port_impl->from_socket++;
>
> nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
> (int) ctx_impl->read_port->id.pid,
> (int) ctx_impl->read_port->id.id,
> port_impl->from_socket);
>
> } else {
> nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
> (int) ctx_impl->read_port->id.pid,
> (int) ctx_impl->read_port->id.id,
> (int) rbuf->size);
>
> return NXT_UNIT_OK;
> }
> }
> }
>
> res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
> if (res == NXT_UNIT_OK) {
> return NXT_UNIT_OK;
> }
>
4320,4321c4469,4470
< res = poll(fds, 2, -1);
< if (nxt_slow_path(res < 0)) {
---
> nevents = poll(fds, 2, -1);
> if (nxt_slow_path(nevents == -1)) {
4328,4329c4477,4478
< nxt_unit_alert(ctx, "poll() failed: %s (%d)",
< strerror(err), err);
---
> nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
> fds[0].fd, fds[1].fd, strerror(err), err);
4335a4485,4488
> nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
> fds[0].fd, fds[1].fd, nevents, fds[0].revents,
> fds[1].revents);
>
4337c4490,4495
< return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
---
> res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
> if (res == NXT_UNIT_AGAIN) {
> goto retry;
> }
>
> return res;
4341c4499,4504
< return nxt_unit_port_recv(ctx, lib->shared_port, rbuf);
---
> res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
> if (res == NXT_UNIT_AGAIN) {
> goto retry;
> }
>
> return res;
4344c4507,4509
< rbuf->size = -1;
---
> nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
> fds[0].fd, fds[1].fd, nevents, fds[0].revents,
> fds[1].revents);
4394a4560
> int res;
4397a4564
> nxt_unit_request_info_t *req;
4422c4589
< (void) nxt_unit_send_req_headers_ack(&req_impl->req);
---
> req = &req_impl->req;
4423a4591,4618
> res = nxt_unit_send_req_headers_ack(req);
> if (nxt_slow_path(res != NXT_UNIT_OK)) {
> nxt_unit_request_done(req, NXT_UNIT_ERROR);
>
> continue;
> }
>
> if (req->content_length
> > (uint64_t) (req->content_buf->end - req->content_buf->free))
> {
> res = nxt_unit_request_hash_add(ctx, req);
> if (nxt_slow_path(res != NXT_UNIT_OK)) {
> nxt_unit_req_warn(req, "failed to add request to hash");
>
> nxt_unit_request_done(req, NXT_UNIT_ERROR);
>
> continue;
> }
>
> /*
> * If application have separate data handler, we may start
> * request processing and process data when it is arrived.
> */
> if (lib->callbacks.data_handler == NULL) {
> continue;
> }
> }
>
4434a4630
> nxt_unit_read_buf_t *rbuf;
4445c4641,4645
< rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port);
---
> rbuf = nxt_unit_read_buf_get(ctx);
> if (nxt_slow_path(rbuf == NULL)) {
> rc = NXT_UNIT_ERROR;
> break;
> }
4446a4647,4654
> retry:
>
> rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
> if (rc == NXT_UNIT_AGAIN) {
> goto retry;
> }
>
> rc = nxt_unit_process_msg(ctx, rbuf);
4449a4658,4664
>
> rc = nxt_unit_process_pending_rbuf(ctx);
> if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
> break;
> }
>
> nxt_unit_process_ready_req(ctx);
4457a4673,4728
> nxt_inline int
> nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
> {
> nxt_port_msg_t *port_msg;
>
> if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
> port_msg = (nxt_port_msg_t *) rbuf->buf;
>
> return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
> }
>
> return 0;
> }
>
>
> nxt_inline int
> nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
> {
> if (nxt_fast_path(rbuf->size == 1)) {
> return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
> }
>
> return 0;
> }
>
>
> nxt_inline int
> nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
> {
> nxt_port_msg_t *port_msg;
>
> if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
> port_msg = (nxt_port_msg_t *) rbuf->buf;
>
> return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
> }
>
> return 0;
> }
>
>
> nxt_inline int
> nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
> {
> nxt_port_msg_t *port_msg;
>
> if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
> port_msg = (nxt_port_msg_t *) rbuf->buf;
>
> return port_msg->type == _NXT_PORT_MSG_QUIT;
> }
>
> return 0;
> }
>
>
4461,4462c4732,4734
< int rc;
< nxt_unit_impl_t *lib;
---
> int rc;
> nxt_unit_impl_t *lib;
> nxt_unit_read_buf_t *rbuf;
4470c4742,4746
< rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port);
---
> rbuf = nxt_unit_read_buf_get(ctx);
> if (nxt_slow_path(rbuf == NULL)) {
> rc = NXT_UNIT_ERROR;
> break;
> }
4471a4748,4754
> retry:
>
> rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
> if (rc == NXT_UNIT_AGAIN) {
> goto retry;
> }
>
4472a4756
> nxt_unit_read_buf_release(ctx, rbuf);
4474a4759,4770
>
> rc = nxt_unit_process_msg(ctx, rbuf);
> if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
> break;
> }
>
> rc = nxt_unit_process_pending_rbuf(ctx);
> if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
> break;
> }
>
> nxt_unit_process_ready_req(ctx);
4501a4798
> nxt_unit_impl_t *lib;
4509c4806
< memset(rbuf->oob, 0, sizeof(struct cmsghdr));
---
> lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4511,4512c4808,4817
< rc = nxt_unit_port_recv(ctx, port, rbuf);
< if (nxt_slow_path(rc != NXT_UNIT_OK)) {
---
> retry:
>
> if (port == lib->shared_port) {
> rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
>
> } else {
> rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
> }
>
> if (rc != NXT_UNIT_OK) {
4528a4834,4842
> rbuf = nxt_unit_read_buf_get(ctx);
> if (nxt_slow_path(rbuf == NULL)) {
> return NXT_UNIT_ERROR;
> }
>
> if (lib->online) {
> goto retry;
> }
>
4543,4546c4857,4862
< int rc;
< nxt_unit_impl_t *lib;
< nxt_unit_port_t *port;
< nxt_unit_ctx_impl_t *new_ctx;
---
> int rc, queue_fd;
> void *mem;
> nxt_unit_impl_t *lib;
> nxt_unit_port_t *port;
> nxt_unit_ctx_impl_t *new_ctx;
> nxt_unit_port_impl_t *port_impl;
4556a4873,4881
> rc = nxt_unit_ctx_init(lib, new_ctx, data);
> if (nxt_slow_path(rc != NXT_UNIT_OK)) {
> free(new_ctx);
>
> return NULL;
> }
>
> queue_fd = -1;
>
4559c4884,4885
< free(new_ctx);
---
> goto fail;
> }
4561c4887,4891
< return NULL;
---
> new_ctx->read_port = port;
>
> queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
> if (nxt_slow_path(queue_fd == -1)) {
> goto fail;
4564,4565c4894,4899
< rc = nxt_unit_send_port(ctx, lib->router_port, port);
< if (nxt_slow_path(rc != NXT_UNIT_OK)) {
---
> mem = mmap(NULL, sizeof(nxt_port_queue_t),
> PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
> if (nxt_slow_path(mem == MAP_FAILED)) {
> nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
> strerror(errno), errno);
>
4569c4903,4908
< rc = nxt_unit_ctx_init(lib, new_ctx, data);
---
> nxt_port_queue_init(mem);
>
> port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
> port_impl->queue = mem;
>
> rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
4574c4913
< new_ctx->read_port = port;
---
> close(queue_fd);
4580,4581c4919,4921
< nxt_unit_remove_port(lib, &port->id);
< nxt_unit_port_release(port);
---
> if (queue_fd != -1) {
> close(queue_fd);
> }
4583c4923
< free(new_ctx);
---
> nxt_unit_ctx_release(&new_ctx->ctx);
4635a4976
> nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
4712c5053
< port = nxt_unit_add_port(ctx, &new_port);
---
> port = nxt_unit_add_port(ctx, &new_port, NULL);
4714,4715d5054
< nxt_unit_alert(ctx, "create_port: add_port() failed");
<
4726c5065
< nxt_unit_port_t *port)
---
> nxt_unit_port_t *port, int queue_fd)
4729a5069
> int fds[2] = { port->out_fd, queue_fd };
4738c5078
< char space[CMSG_SPACE(sizeof(int))];
---
> char space[CMSG_SPACE(sizeof(int) * 2)];
4761c5101
< cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
---
> cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
4774c5114
< memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int));
---
> memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
4802c5142
< nxt_unit_debug(NULL, "destroy port %d,%d",
---
> nxt_unit_debug(NULL, "destroy port{%d,%d}",
4818a5159,5176
> if (port->in_fd != -1) {
> close(port->in_fd);
>
> port->in_fd = -1;
> }
>
> if (port->out_fd != -1) {
> close(port->out_fd);
>
> port->out_fd = -1;
> }
>
> if (port_impl->queue != NULL) {
> munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
> ? sizeof(nxt_app_queue_t)
> : sizeof(nxt_port_queue_t));
> }
>
4825c5183
< nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
---
> nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
4843,4845c5201,5204
< nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d",
< port->id.pid, port->id.id,
< port->in_fd, port->out_fd);
---
> nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
> "in_fd %d out_fd %d queue %p",
> port->id.pid, port->id.id,
> port->in_fd, port->out_fd, queue);
4877a5237,5240
> if (old_port_impl->queue == NULL) {
> old_port_impl->queue = queue;
> }
>
4917c5280
< nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
---
> nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
4919c5282
< port->in_fd, port->out_fd);
---
> port->in_fd, port->out_fd, queue);
4931a5295,5297
> nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
> port->id.pid, port->id.id);
>
4953a5320,5322
> new_port->queue = queue;
> new_port->from_socket = 0;
> new_port->socket_rbuf = NULL;
5013c5382
< nxt_unit_debug(NULL, "remove_port: port %d,%d not found",
---
> nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
5019c5388
< nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p",
---
> nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
5092c5461,5462
< lib->online = 0;
---
> if (lib->online) {
> lib->online = 0;
5094,5095c5464,5466
< if (lib->callbacks.quit != NULL) {
< lib->callbacks.quit(ctx);
---
> if (lib->callbacks.quit != NULL) {
> lib->callbacks.quit(ctx);
> }
5140c5511,5516
< nxt_unit_impl_t *lib;
---
> int notify;
> ssize_t ret;
> nxt_int_t rc;
> nxt_port_msg_t msg;
> nxt_unit_impl_t *lib;
> nxt_unit_port_impl_t *port_impl;
5142,5144d5517
< nxt_unit_debug(ctx, "port_send: port %d,%d fd %d",
< (int) port->id.pid, (int) port->id.id, port->out_fd);
<
5146a5520,5577
> port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
> if (port_impl->queue != NULL && oob_size == 0
> && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
> {
> rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
> if (nxt_slow_path(rc != NXT_OK)) {
> nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
> (int) port->id.pid, (int) port->id.id);
>
> return -1;
> }
>
> nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
> (int) port->id.pid, (int) port->id.id,
> (int) buf_size, notify);
>
> if (notify) {
> memcpy(&msg, buf, sizeof(nxt_port_msg_t));
>
> msg.type = _NXT_PORT_MSG_READ_QUEUE;
>
> if (lib->callbacks.port_send == NULL) {
> ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
> sizeof(nxt_port_msg_t), NULL, 0);
>
> nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
> (int) port->id.pid, (int) port->id.id,
> (int) ret);
>
> } else {
> ret = lib->callbacks.port_send(ctx, port, &msg,
> sizeof(nxt_port_msg_t), NULL, 0);
>
> nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
> (int) port->id.pid, (int) port->id.id,
> (int) ret);
> }
>
> }
>
> return buf_size;
> }
>
> if (port_impl->queue != NULL) {
> msg.type = _NXT_PORT_MSG_READ_SOCKET;
>
> rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
> if (nxt_slow_path(rc != NXT_OK)) {
> nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
> (int) port->id.pid, (int) port->id.id);
>
> return -1;
> }
>
> nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
> (int) port->id.pid, (int) port->id.id, notify);
> }
>
5148,5149c5579,5592
< return lib->callbacks.port_send(ctx, port, buf, buf_size,
< oob, oob_size);
---
> ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
> oob, oob_size);
>
> nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
> (int) port->id.pid, (int) port->id.id,
> (int) ret);
>
> } else {
> ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
> oob, oob_size);
>
> nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
> (int) port->id.pid, (int) port->id.id,
> (int) ret);
5152,5153c5595
< return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
< oob, oob_size);
---
> return ret;
5160a5603
> int err;
5181c5624,5626
< if (errno == EINTR) {
---
> err = errno;
>
> if (err == EINTR) {
5190c5635
< fd, (int) buf_size, strerror(errno), errno);
---
> fd, (int) buf_size, strerror(err), err);
5201a5647,5798
> nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
> nxt_unit_read_buf_t *rbuf)
> {
> int res, read;
> nxt_unit_port_impl_t *port_impl;
>
> port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
>
> read = 0;
>
> retry:
>
> if (port_impl->from_socket > 0) {
> if (port_impl->socket_rbuf != NULL
> && port_impl->socket_rbuf->size > 0)
> {
> port_impl->from_socket--;
>
> nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
> port_impl->socket_rbuf->size = 0;
>
> nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
> (int) port->id.pid, (int) port->id.id,
> (int) rbuf->size);
>
> return NXT_UNIT_OK;
> }
>
> } else {
> res = nxt_unit_port_queue_recv(port, rbuf);
>
> if (res == NXT_UNIT_OK) {
> if (nxt_unit_is_read_socket(rbuf)) {
> port_impl->from_socket++;
>
> nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
> (int) port->id.pid, (int) port->id.id,
> port_impl->from_socket);
>
> goto retry;
> }
>
> nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
> (int) port->id.pid, (int) port->id.id,
> (int) rbuf->size);
>
> return NXT_UNIT_OK;
> }
> }
>
> if (read) {
> return NXT_UNIT_AGAIN;
> }
>
> res = nxt_unit_port_recv(ctx, port, rbuf);
> if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
> return NXT_UNIT_ERROR;
> }
>
> read = 1;
>
> if (nxt_unit_is_read_queue(rbuf)) {
> nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
> (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
>
> if (port_impl->from_socket) {
> nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET");
> }
>
> goto retry;
> }
>
> nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
> (int) port->id.pid, (int) port->id.id,
> (int) rbuf->size);
>
> if (res == NXT_UNIT_AGAIN) {
> return NXT_UNIT_AGAIN;
> }
>
> if (port_impl->from_socket > 0) {
> port_impl->from_socket--;
>
> return NXT_UNIT_OK;
> }
>
> nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
> (int) port->id.pid, (int) port->id.id,
> (int) rbuf->size);
>
> if (port_impl->socket_rbuf == NULL) {
> port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
>
> if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
> return NXT_UNIT_ERROR;
> }
>
> port_impl->socket_rbuf->size = 0;
> }
>
> if (port_impl->socket_rbuf->size > 0) {
> nxt_unit_alert(ctx, "too many port socket messages");
>
> return NXT_UNIT_ERROR;
> }
>
> nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
>
> memset(rbuf->oob, 0, sizeof(struct cmsghdr));
>
> goto retry;
> }
>
>
> nxt_inline void
> nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
> {
> memcpy(dst->buf, src->buf, src->size);
> dst->size = src->size;
> memcpy(dst->oob, src->oob, sizeof(src->oob));
> }
>
>
> static int
> nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
> nxt_unit_read_buf_t *rbuf)
> {
> int res;
>
> retry:
>
> res = nxt_unit_app_queue_recv(port, rbuf);
>
> if (res == NXT_UNIT_AGAIN) {
> res = nxt_unit_port_recv(ctx, port, rbuf);
> if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
> return NXT_UNIT_ERROR;
> }
>
> if (nxt_unit_is_read_queue(rbuf)) {
> nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
> (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
>
> goto retry;
> }
> }
>
> return res;
> }
>
>
> static int
5216a5814,5816
> nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
> (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
>
5250c5850
< fd, strerror(errno), errno);
---
> fd, strerror(err), err);
5256c5856
< fd, strerror(errno), errno);
---
> fd, strerror(err), err);
5266a5867,5912
> static int
> nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
> {
> nxt_unit_port_impl_t *port_impl;
>
> port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
>
> rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
>
> return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
> }
>
>
> static int
> nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
> {
> uint32_t cookie;
> nxt_port_msg_t *port_msg;
> nxt_app_queue_t *queue;
> nxt_unit_port_impl_t *port_impl;
>
> port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
> queue = port_impl->queue;
>
> retry:
>
> rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
>
> nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
>
> if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
> port_msg = (nxt_port_msg_t *) rbuf->buf;
>
> if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
> return NXT_UNIT_OK;
> }
>
> nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
>
> goto retry;
> }
>
> return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
> }
>
>
5395,5396c6041,6042
< nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
< nxt_unit_request_info_impl_t *req_impl)
---
> nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
> nxt_unit_request_info_t *req)
5398,5400c6044,6048
< uint32_t *stream;
< nxt_int_t res;
< nxt_lvlhsh_query_t lhq;
---
> uint32_t *stream;
> nxt_int_t res;
> nxt_lvlhsh_query_t lhq;
> nxt_unit_ctx_impl_t *ctx_impl;
> nxt_unit_request_info_impl_t *req_impl;
5401a6050,6054
> req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
> if (req_impl->in_hash) {
> return NXT_UNIT_OK;
> }
>
5412c6065
< res = nxt_lvlhsh_insert(request_hash, &lhq);
---
> ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5413a6067,6072
> pthread_mutex_lock(&ctx_impl->mutex);
>
> res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
>
> pthread_mutex_unlock(&ctx_impl->mutex);
>
5416a6076
> req_impl->in_hash = 1;
5425,5427c6085,6086
< static nxt_unit_request_info_impl_t *
< nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
< int remove)
---
> static nxt_unit_request_info_t *
> nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
5429,5430c6088,6091
< nxt_int_t res;
< nxt_lvlhsh_query_t lhq;
---
> nxt_int_t res;
> nxt_lvlhsh_query_t lhq;
> nxt_unit_ctx_impl_t *ctx_impl;
> nxt_unit_request_info_impl_t *req_impl;
5437a6099,6102
> ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
>
> pthread_mutex_lock(&ctx_impl->mutex);
>
5439c6104
< res = nxt_lvlhsh_delete(request_hash, &lhq);
---
> res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
5442c6107
< res = nxt_lvlhsh_find(request_hash, &lhq);
---
> res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
5444a6110,6111
> pthread_mutex_unlock(&ctx_impl->mutex);
>
5447a6115,6118
> req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
> req);
> req_impl->in_hash = 0;
>