9a10,11 > #include "nxt_port_queue.h" > #include "nxt_app_queue.h" 53c55,56 < static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); --- > static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, > int queue_fd); 58a62,63 > static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, > nxt_unit_recv_msg_t *recv_msg); 94a100 > static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); 106,107d111 < static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, < nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); 126a131,134 > nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); > nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); > nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); > nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); 133c141 < nxt_unit_port_t *port); --- > nxt_unit_port_t *port, int queue_fd); 138c146 < nxt_unit_port_t *port); --- > nxt_unit_port_t *port, void *queue); 152a161,166 > static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, > nxt_unit_read_buf_t *rbuf); > nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, > nxt_unit_read_buf_t *src); > static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, > nxt_unit_read_buf_t *rbuf); 154a169,172 > static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, > nxt_unit_read_buf_t *rbuf); > static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, > nxt_unit_read_buf_t *rbuf); 161,164c179,182 < static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, < nxt_unit_request_info_impl_t *req_impl); < static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( < nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); --- > static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, > nxt_unit_request_info_t *req); > static nxt_unit_request_info_t *nxt_unit_request_hash_find( > nxt_unit_ctx_t *ctx, uint32_t stream, int remove); 219a238 > uint8_t in_hash; 351a371,375 > > void *queue; > > int from_socket; > nxt_unit_read_buf_t *socket_rbuf; 378c402,403 < int rc; --- > int rc, queue_fd; > void *mem; 388a414,415 > queue_fd = -1; > 425c452 < lib->router_port = nxt_unit_add_port(ctx, &router_port); --- > lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); 432c459,475 < lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port); --- > queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); > if (nxt_slow_path(queue_fd == -1)) { > goto fail; > } > > mem = mmap(NULL, sizeof(nxt_port_queue_t), > PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); > if (nxt_slow_path(mem == MAP_FAILED)) { > nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, > strerror(errno), errno); > > goto fail; > } > > nxt_port_queue_init(mem); > > lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); 435a479,480 > munmap(mem, sizeof(nxt_port_queue_t)); > 439c484 < rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream); --- > rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); 442a488,489 > munmap(mem, sizeof(nxt_port_queue_t)); > 446a494 > close(queue_fd); 451a500,503 > if (queue_fd != -1) { > close(queue_fd); > } > 499a552 > pthread_mutex_destroy(&lib->mutex); 507a561 > pthread_mutex_destroy(&lib->mutex); 768c822 < nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) --- > nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) 773a828,832 > union { > struct cmsghdr cm; > char space[CMSG_SPACE(sizeof(int))]; > } cmsg; > 786c845,863 < res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0); --- > memset(&cmsg, 0, sizeof(cmsg)); > > cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); > cmsg.cm.cmsg_level = SOL_SOCKET; > cmsg.cm.cmsg_type = SCM_RIGHTS; > > /* > * memcpy() is used instead of simple > * *(int *) CMSG_DATA(&cmsg.cm) = fd; > * because GCC 4.4 with -O2/3/s optimization may issue a warning: > * dereferencing type-punned pointer will break strict-aliasing rules > * > * Fortunately, GCC with -O1 compiles this nxt_memcpy() > * in the same simple assignment as in the code above. > */ > memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); > > res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), > &cmsg, sizeof(cmsg)); 840a918,921 > nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d", > port_msg->stream, (int) port_msg->type, > recv_msg.fd, recv_msg.fd2); > 856,868d936 < if (port_msg->tracking) { < rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf); < < if (nxt_slow_path(rc != NXT_UNIT_OK)) { < if (rc == NXT_UNIT_AGAIN) { < recv_msg.fd = -1; < recv_msg.fd2 = -1; < } < < goto fail; < } < } < 931a1000,1003 > case _NXT_PORT_MSG_REQ_BODY: > rc = nxt_unit_process_req_body(ctx, &recv_msg); > break; > 994a1067 > void *mem; 1016c1089 < nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", --- > nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d", 1018c1091 < (int) new_port_msg->id, recv_msg->fd); --- > (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2); 1027a1101,1103 > mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, > MAP_SHARED, recv_msg->fd2, 0); > 1043a1120,1122 > > mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, > MAP_SHARED, recv_msg->fd2, 0); 1045a1125,1127 > if (nxt_slow_path(mem == MAP_FAILED)) { > nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2, > strerror(errno), errno); 1046a1129,1131 > return NXT_UNIT_ERROR; > } > 1051c1136 < port = nxt_unit_add_port(ctx, &new_port); --- > port = nxt_unit_add_port(ctx, &new_port, mem); 1136a1222 > req_impl->in_hash = 0; 1154,1155c1240,1243 < if (nxt_slow_path(res != NXT_UNIT_OK)) { < return res; --- > if (nxt_slow_path(res == NXT_UNIT_ERROR)) { > nxt_unit_request_done(req, NXT_UNIT_ERROR); > > return NXT_UNIT_ERROR; 1159a1248,1268 > if (req->content_length > > (uint64_t) (req->content_buf->end - req->content_buf->free)) > { > res = nxt_unit_request_hash_add(ctx, req); > if (nxt_slow_path(res != NXT_UNIT_OK)) { > nxt_unit_req_warn(req, "failed to add request to hash"); > > nxt_unit_request_done(req, NXT_UNIT_ERROR); > > return NXT_UNIT_ERROR; > } > > /* > * If application have separate data handler, we may start > * request processing and process data when it is arrived. > */ > if (lib->callbacks.data_handler == NULL) { > return NXT_UNIT_OK; > } > } > 1167a1277,1323 > nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) > { > uint64_t l; > nxt_unit_impl_t *lib; > nxt_unit_mmap_buf_t *b; > nxt_unit_request_info_t *req; > > req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); > if (req == NULL) { > return NXT_UNIT_OK; > } > > l = req->content_buf->end - req->content_buf->free; > > for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { > b->req = req; > l += b->buf.end - b->buf.free; > } > > if (recv_msg->incoming_buf != NULL) { > b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); > > /* "Move" incoming buffer list to req_impl. */ > nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf); > recv_msg->incoming_buf = NULL; > } > > req->content_fd = recv_msg->fd; > recv_msg->fd = -1; > > lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); > > if (lib->callbacks.data_handler != NULL) { > lib->callbacks.data_handler(req); > > return NXT_UNIT_OK; > } > > if (req->content_fd != -1 || l == req->content_length) { > lib->callbacks.request_handler(req); > } > > return NXT_UNIT_OK; > } > > > static int 1262a1419,1421 > port_impl->queue = NULL; > port_impl->from_socket = 0; > port_impl->socket_rbuf = NULL; 1324d1482 < nxt_unit_ctx_impl_t *ctx_impl; 1330,1334c1488,1489 < ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); < < req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, < recv_msg->last); < if (req_impl == NULL) { --- > req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); > if (nxt_slow_path(req == NULL)) { 1338c1493 < req = &req_impl->req; --- > req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1504,1507c1659,1660 < if (req_impl->websocket) { < nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); < < req_impl->websocket = 0; --- > if (req_impl->in_hash) { > nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); 1509a1663,1664 > req_impl->websocket = 0; > 2173d2327 < nxt_unit_ctx_impl_t *ctx_impl; 2196,2198c2350 < ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); < < rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); --- > rc = nxt_unit_request_hash_add(req->ctx, req); 2468a2621,2622 > memset(rbuf->oob, 0, sizeof(struct cmsghdr)); > 2566a2721,2722 > nxt_unit_req_debug(req, "write: %d", (int) size); > 2745a2902,2903 > nxt_unit_req_debug(req, "read: %d", (int) buf_res); > 2748c2906 < if (res < 0) { --- > if (nxt_slow_path(res < 0)) { 3304c3462 < nxt_port_msg_t *port_msg; --- > int res; 3316,3320c3474,3475 < memset(rbuf->oob, 0, sizeof(struct cmsghdr)); < < nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); < < if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { --- > res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); > if (res == NXT_UNIT_ERROR) { 3326,3328c3481 < port_msg = (nxt_port_msg_t *) rbuf->buf; < < if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { --- > if (nxt_unit_is_shm_ack(rbuf)) { 3330d3482 < 3340c3492 < if (port_msg->type == _NXT_PORT_MSG_QUIT) { --- > if (nxt_unit_is_quit(rbuf)) { 3409d3560 < char name[64]; 3423,3428c3574 < snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", < lib->pid, (void *) pthread_self()); < < #if (NXT_HAVE_MEMFD_CREATE) < < fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); --- > fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE); 3430,3432d3575 < nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, < strerror(errno), errno); < 3436,3478d3578 < nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); < < #elif (NXT_HAVE_SHM_OPEN_ANON) < < fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); < if (nxt_slow_path(fd == -1)) { < nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", < strerror(errno), errno); < < goto remove_fail; < } < < #elif (NXT_HAVE_SHM_OPEN) < < /* Just in case. */ < shm_unlink(name); < < fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); < if (nxt_slow_path(fd == -1)) { < nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, < strerror(errno), errno); < < goto remove_fail; < } < < if (nxt_slow_path(shm_unlink(name) == -1)) { < nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name, < strerror(errno), errno); < } < < #else < < #error No working shared memory implementation. < < #endif < < if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { < nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, < strerror(errno), errno); < < goto remove_fail; < } < 3483a3584,3585 > close(fd); > 3535a3638,3711 > nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) > { > int fd; > nxt_unit_impl_t *lib; > > lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); > > #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) > char name[64]; > > snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", > lib->pid, (void *) pthread_self()); > #endif > > #if (NXT_HAVE_MEMFD_CREATE) > > fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); > if (nxt_slow_path(fd == -1)) { > nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, > strerror(errno), errno); > > return -1; > } > > nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); > > #elif (NXT_HAVE_SHM_OPEN_ANON) > > fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); > if (nxt_slow_path(fd == -1)) { > nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", > strerror(errno), errno); > > return -1; > } > > #elif (NXT_HAVE_SHM_OPEN) > > /* Just in case. */ > shm_unlink(name); > > fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); > if (nxt_slow_path(fd == -1)) { > nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, > strerror(errno), errno); > > return -1; > } > > if (nxt_slow_path(shm_unlink(name) == -1)) { > nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name, > strerror(errno), errno); > } > > #else > > #error No working shared memory implementation. > > #endif > > if (nxt_slow_path(ftruncate(fd, size) == -1)) { > nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, > strerror(errno), errno); > > close(fd); > > return -1; > } > > return fd; > } > > > static int 3800,3854d3975 < nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, < nxt_unit_read_buf_t *rbuf) < { < int res; < nxt_chunk_id_t c; < nxt_unit_impl_t *lib; < nxt_port_mmap_header_t *hdr; < nxt_port_mmap_tracking_msg_t *tracking_msg; < < if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { < nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", < recv_msg->stream, (int) recv_msg->size); < < return NXT_UNIT_ERROR; < } < < tracking_msg = recv_msg->start; < < recv_msg->start = tracking_msg + 1; < recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); < < lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); < < pthread_mutex_lock(&lib->incoming.mutex); < < res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming, < recv_msg->pid, tracking_msg->mmap_id, < &hdr, rbuf); < < if (nxt_slow_path(res != NXT_UNIT_OK)) { < return res; < } < < c = tracking_msg->tracking_id; < res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); < < if (res == 0) { < nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", < recv_msg->stream); < < nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); < < res = NXT_UNIT_CANCELLED; < < } else { < res = NXT_UNIT_OK; < } < < pthread_mutex_unlock(&lib->incoming.mutex); < < return res; < } < < < static int 4157c4278 < process->use_count = 1; --- > process->use_count = 2; 4179,4180d4299 < nxt_unit_process_use(process); < 4296,4299c4415,4419 < int res, err; < nxt_unit_impl_t *lib; < nxt_unit_ctx_impl_t *ctx_impl; < struct pollfd fds[2]; --- > int nevents, res, err; > nxt_unit_impl_t *lib; > nxt_unit_ctx_impl_t *ctx_impl; > nxt_unit_port_impl_t *port_impl; > struct pollfd fds[2]; 4304,4305d4423 < memset(rbuf->oob, 0, sizeof(struct cmsghdr)); < 4307c4425,4426 < return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); --- > > return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4309a4429,4431 > port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, > port); > 4311a4434,4460 > if (port_impl->from_socket == 0) { > res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf); > if (res == NXT_UNIT_OK) { > if (nxt_unit_is_read_socket(rbuf)) { > port_impl->from_socket++; > > nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", > (int) ctx_impl->read_port->id.pid, > (int) ctx_impl->read_port->id.id, > port_impl->from_socket); > > } else { > nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", > (int) ctx_impl->read_port->id.pid, > (int) ctx_impl->read_port->id.id, > (int) rbuf->size); > > return NXT_UNIT_OK; > } > } > } > > res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); > if (res == NXT_UNIT_OK) { > return NXT_UNIT_OK; > } > 4320,4321c4469,4470 < res = poll(fds, 2, -1); < if (nxt_slow_path(res < 0)) { --- > nevents = poll(fds, 2, -1); > if (nxt_slow_path(nevents == -1)) { 4328,4329c4477,4478 < nxt_unit_alert(ctx, "poll() failed: %s (%d)", < strerror(err), err); --- > nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)", > fds[0].fd, fds[1].fd, strerror(err), err); 4335a4485,4488 > nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", > fds[0].fd, fds[1].fd, nevents, fds[0].revents, > fds[1].revents); > 4337c4490,4495 < return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); --- > res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); > if (res == NXT_UNIT_AGAIN) { > goto retry; > } > > return res; 4341c4499,4504 < return nxt_unit_port_recv(ctx, lib->shared_port, rbuf); --- > res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); > if (res == NXT_UNIT_AGAIN) { > goto retry; > } > > return res; 4344c4507,4509 < rbuf->size = -1; --- > nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]", > fds[0].fd, fds[1].fd, nevents, fds[0].revents, > fds[1].revents); 4394a4560 > int res; 4397a4564 > nxt_unit_request_info_t *req; 4422c4589 < (void) nxt_unit_send_req_headers_ack(&req_impl->req); --- > req = &req_impl->req; 4423a4591,4618 > res = nxt_unit_send_req_headers_ack(req); > if (nxt_slow_path(res != NXT_UNIT_OK)) { > nxt_unit_request_done(req, NXT_UNIT_ERROR); > > continue; > } > > if (req->content_length > > (uint64_t) (req->content_buf->end - req->content_buf->free)) > { > res = nxt_unit_request_hash_add(ctx, req); > if (nxt_slow_path(res != NXT_UNIT_OK)) { > nxt_unit_req_warn(req, "failed to add request to hash"); > > nxt_unit_request_done(req, NXT_UNIT_ERROR); > > continue; > } > > /* > * If application have separate data handler, we may start > * request processing and process data when it is arrived. > */ > if (lib->callbacks.data_handler == NULL) { > continue; > } > } > 4434a4630 > nxt_unit_read_buf_t *rbuf; 4445c4641,4645 < rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port); --- > rbuf = nxt_unit_read_buf_get(ctx); > if (nxt_slow_path(rbuf == NULL)) { > rc = NXT_UNIT_ERROR; > break; > } 4446a4647,4654 > retry: > > rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); > if (rc == NXT_UNIT_AGAIN) { > goto retry; > } > > rc = nxt_unit_process_msg(ctx, rbuf); 4449a4658,4664 > > rc = nxt_unit_process_pending_rbuf(ctx); > if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { > break; > } > > nxt_unit_process_ready_req(ctx); 4457a4673,4728 > nxt_inline int > nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf) > { > nxt_port_msg_t *port_msg; > > if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { > port_msg = (nxt_port_msg_t *) rbuf->buf; > > return port_msg->type == _NXT_PORT_MSG_READ_QUEUE; > } > > return 0; > } > > > nxt_inline int > nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf) > { > if (nxt_fast_path(rbuf->size == 1)) { > return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET; > } > > return 0; > } > > > nxt_inline int > nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf) > { > nxt_port_msg_t *port_msg; > > if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { > port_msg = (nxt_port_msg_t *) rbuf->buf; > > return port_msg->type == _NXT_PORT_MSG_SHM_ACK; > } > > return 0; > } > > > nxt_inline int > nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf) > { > nxt_port_msg_t *port_msg; > > if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { > port_msg = (nxt_port_msg_t *) rbuf->buf; > > return port_msg->type == _NXT_PORT_MSG_QUIT; > } > > return 0; > } > > 4461,4462c4732,4734 < int rc; < nxt_unit_impl_t *lib; --- > int rc; > nxt_unit_impl_t *lib; > nxt_unit_read_buf_t *rbuf; 4470c4742,4746 < rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port); --- > rbuf = nxt_unit_read_buf_get(ctx); > if (nxt_slow_path(rbuf == NULL)) { > rc = NXT_UNIT_ERROR; > break; > } 4471a4748,4754 > retry: > > rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); > if (rc == NXT_UNIT_AGAIN) { > goto retry; > } > 4472a4756 > nxt_unit_read_buf_release(ctx, rbuf); 4474a4759,4770 > > rc = nxt_unit_process_msg(ctx, rbuf); > if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { > break; > } > > rc = nxt_unit_process_pending_rbuf(ctx); > if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { > break; > } > > nxt_unit_process_ready_req(ctx); 4501a4798 > nxt_unit_impl_t *lib; 4509c4806 < memset(rbuf->oob, 0, sizeof(struct cmsghdr)); --- > lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4511,4512c4808,4817 < rc = nxt_unit_port_recv(ctx, port, rbuf); < if (nxt_slow_path(rc != NXT_UNIT_OK)) { --- > retry: > > if (port == lib->shared_port) { > rc = nxt_unit_shared_port_recv(ctx, port, rbuf); > > } else { > rc = nxt_unit_ctx_port_recv(ctx, port, rbuf); > } > > if (rc != NXT_UNIT_OK) { 4528a4834,4842 > rbuf = nxt_unit_read_buf_get(ctx); > if (nxt_slow_path(rbuf == NULL)) { > return NXT_UNIT_ERROR; > } > > if (lib->online) { > goto retry; > } > 4543,4546c4857,4862 < int rc; < nxt_unit_impl_t *lib; < nxt_unit_port_t *port; < nxt_unit_ctx_impl_t *new_ctx; --- > int rc, queue_fd; > void *mem; > nxt_unit_impl_t *lib; > nxt_unit_port_t *port; > nxt_unit_ctx_impl_t *new_ctx; > nxt_unit_port_impl_t *port_impl; 4556a4873,4881 > rc = nxt_unit_ctx_init(lib, new_ctx, data); > if (nxt_slow_path(rc != NXT_UNIT_OK)) { > free(new_ctx); > > return NULL; > } > > queue_fd = -1; > 4559c4884,4885 < free(new_ctx); --- > goto fail; > } 4561c4887,4891 < return NULL; --- > new_ctx->read_port = port; > > queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); > if (nxt_slow_path(queue_fd == -1)) { > goto fail; 4564,4565c4894,4899 < rc = nxt_unit_send_port(ctx, lib->router_port, port); < if (nxt_slow_path(rc != NXT_UNIT_OK)) { --- > mem = mmap(NULL, sizeof(nxt_port_queue_t), > PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); > if (nxt_slow_path(mem == MAP_FAILED)) { > nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, > strerror(errno), errno); > 4569c4903,4908 < rc = nxt_unit_ctx_init(lib, new_ctx, data); --- > nxt_port_queue_init(mem); > > port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); > port_impl->queue = mem; > > rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd); 4574c4913 < new_ctx->read_port = port; --- > close(queue_fd); 4580,4581c4919,4921 < nxt_unit_remove_port(lib, &port->id); < nxt_unit_port_release(port); --- > if (queue_fd != -1) { > close(queue_fd); > } 4583c4923 < free(new_ctx); --- > nxt_unit_ctx_release(&new_ctx->ctx); 4635a4976 > nxt_unit_remove_port(lib, &ctx_impl->read_port->id); 4712c5053 < port = nxt_unit_add_port(ctx, &new_port); --- > port = nxt_unit_add_port(ctx, &new_port, NULL); 4714,4715d5054 < nxt_unit_alert(ctx, "create_port: add_port() failed"); < 4726c5065 < nxt_unit_port_t *port) --- > nxt_unit_port_t *port, int queue_fd) 4729a5069 > int fds[2] = { port->out_fd, queue_fd }; 4738c5078 < char space[CMSG_SPACE(sizeof(int))]; --- > char space[CMSG_SPACE(sizeof(int) * 2)]; 4761c5101 < cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); --- > cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); 4774c5114 < memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int)); --- > memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); 4802c5142 < nxt_unit_debug(NULL, "destroy port %d,%d", --- > nxt_unit_debug(NULL, "destroy port{%d,%d}", 4818a5159,5176 > if (port->in_fd != -1) { > close(port->in_fd); > > port->in_fd = -1; > } > > if (port->out_fd != -1) { > close(port->out_fd); > > port->out_fd = -1; > } > > if (port_impl->queue != NULL) { > munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1) > ? sizeof(nxt_app_queue_t) > : sizeof(nxt_port_queue_t)); > } > 4825c5183 < nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) --- > nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) 4843,4845c5201,5204 < nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d", < port->id.pid, port->id.id, < port->in_fd, port->out_fd); --- > nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} " > "in_fd %d out_fd %d queue %p", > port->id.pid, port->id.id, > port->in_fd, port->out_fd, queue); 4877a5237,5240 > if (old_port_impl->queue == NULL) { > old_port_impl->queue = queue; > } > 4917c5280 < nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", --- > nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", 4919c5282 < port->in_fd, port->out_fd); --- > port->in_fd, port->out_fd, queue); 4931a5295,5297 > nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", > port->id.pid, port->id.id); > 4953a5320,5322 > new_port->queue = queue; > new_port->from_socket = 0; > new_port->socket_rbuf = NULL; 5013c5382 < nxt_unit_debug(NULL, "remove_port: port %d,%d not found", --- > nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found", 5019c5388 < nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p", --- > nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p", 5092c5461,5462 < lib->online = 0; --- > if (lib->online) { > lib->online = 0; 5094,5095c5464,5466 < if (lib->callbacks.quit != NULL) { < lib->callbacks.quit(ctx); --- > if (lib->callbacks.quit != NULL) { > lib->callbacks.quit(ctx); > } 5140c5511,5516 < nxt_unit_impl_t *lib; --- > int notify; > ssize_t ret; > nxt_int_t rc; > nxt_port_msg_t msg; > nxt_unit_impl_t *lib; > nxt_unit_port_impl_t *port_impl; 5142,5144d5517 < nxt_unit_debug(ctx, "port_send: port %d,%d fd %d", < (int) port->id.pid, (int) port->id.id, port->out_fd); < 5146a5520,5577 > port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); > if (port_impl->queue != NULL && oob_size == 0 > && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) > { > rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); > if (nxt_slow_path(rc != NXT_OK)) { > nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", > (int) port->id.pid, (int) port->id.id); > > return -1; > } > > nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d", > (int) port->id.pid, (int) port->id.id, > (int) buf_size, notify); > > if (notify) { > memcpy(&msg, buf, sizeof(nxt_port_msg_t)); > > msg.type = _NXT_PORT_MSG_READ_QUEUE; > > if (lib->callbacks.port_send == NULL) { > ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, > sizeof(nxt_port_msg_t), NULL, 0); > > nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", > (int) port->id.pid, (int) port->id.id, > (int) ret); > > } else { > ret = lib->callbacks.port_send(ctx, port, &msg, > sizeof(nxt_port_msg_t), NULL, 0); > > nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue", > (int) port->id.pid, (int) port->id.id, > (int) ret); > } > > } > > return buf_size; > } > > if (port_impl->queue != NULL) { > msg.type = _NXT_PORT_MSG_READ_SOCKET; > > rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify); > if (nxt_slow_path(rc != NXT_OK)) { > nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", > (int) port->id.pid, (int) port->id.id); > > return -1; > } > > nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d", > (int) port->id.pid, (int) port->id.id, notify); > } > 5148,5149c5579,5592 < return lib->callbacks.port_send(ctx, port, buf, buf_size, < oob, oob_size); --- > ret = lib->callbacks.port_send(ctx, port, buf, buf_size, > oob, oob_size); > > nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", > (int) port->id.pid, (int) port->id.id, > (int) ret); > > } else { > ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, > oob, oob_size); > > nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", > (int) port->id.pid, (int) port->id.id, > (int) ret); 5152,5153c5595 < return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, < oob, oob_size); --- > return ret; 5160a5603 > int err; 5181c5624,5626 < if (errno == EINTR) { --- > err = errno; > > if (err == EINTR) { 5190c5635 < fd, (int) buf_size, strerror(errno), errno); --- > fd, (int) buf_size, strerror(err), err); 5201a5647,5798 > nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, > nxt_unit_read_buf_t *rbuf) > { > int res, read; > nxt_unit_port_impl_t *port_impl; > > port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); > > read = 0; > > retry: > > if (port_impl->from_socket > 0) { > if (port_impl->socket_rbuf != NULL > && port_impl->socket_rbuf->size > 0) > { > port_impl->from_socket--; > > nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf); > port_impl->socket_rbuf->size = 0; > > nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d", > (int) port->id.pid, (int) port->id.id, > (int) rbuf->size); > > return NXT_UNIT_OK; > } > > } else { > res = nxt_unit_port_queue_recv(port, rbuf); > > if (res == NXT_UNIT_OK) { > if (nxt_unit_is_read_socket(rbuf)) { > port_impl->from_socket++; > > nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", > (int) port->id.pid, (int) port->id.id, > port_impl->from_socket); > > goto retry; > } > > nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", > (int) port->id.pid, (int) port->id.id, > (int) rbuf->size); > > return NXT_UNIT_OK; > } > } > > if (read) { > return NXT_UNIT_AGAIN; > } > > res = nxt_unit_port_recv(ctx, port, rbuf); > if (nxt_slow_path(res == NXT_UNIT_ERROR)) { > return NXT_UNIT_ERROR; > } > > read = 1; > > if (nxt_unit_is_read_queue(rbuf)) { > nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", > (int) port->id.pid, (int) port->id.id, (int) rbuf->size); > > if (port_impl->from_socket) { > nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET"); > } > > goto retry; > } > > nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d", > (int) port->id.pid, (int) port->id.id, > (int) rbuf->size); > > if (res == NXT_UNIT_AGAIN) { > return NXT_UNIT_AGAIN; > } > > if (port_impl->from_socket > 0) { > port_impl->from_socket--; > > return NXT_UNIT_OK; > } > > nxt_unit_debug(ctx, "port{%d,%d} suspend message %d", > (int) port->id.pid, (int) port->id.id, > (int) rbuf->size); > > if (port_impl->socket_rbuf == NULL) { > port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx); > > if (nxt_slow_path(port_impl->socket_rbuf == NULL)) { > return NXT_UNIT_ERROR; > } > > port_impl->socket_rbuf->size = 0; > } > > if (port_impl->socket_rbuf->size > 0) { > nxt_unit_alert(ctx, "too many port socket messages"); > > return NXT_UNIT_ERROR; > } > > nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); > > memset(rbuf->oob, 0, sizeof(struct cmsghdr)); > > goto retry; > } > > > nxt_inline void > nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) > { > memcpy(dst->buf, src->buf, src->size); > dst->size = src->size; > memcpy(dst->oob, src->oob, sizeof(src->oob)); > } > > > static int > nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, > nxt_unit_read_buf_t *rbuf) > { > int res; > > retry: > > res = nxt_unit_app_queue_recv(port, rbuf); > > if (res == NXT_UNIT_AGAIN) { > res = nxt_unit_port_recv(ctx, port, rbuf); > if (nxt_slow_path(res == NXT_UNIT_ERROR)) { > return NXT_UNIT_ERROR; > } > > if (nxt_unit_is_read_queue(rbuf)) { > nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", > (int) port->id.pid, (int) port->id.id, (int) rbuf->size); > > goto retry; > } > } > > return res; > } > > > static int 5216a5814,5816 > nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", > (int) port->id.pid, (int) port->id.id, (int) rbuf->size); > 5250c5850 < fd, strerror(errno), errno); --- > fd, strerror(err), err); 5256c5856 < fd, strerror(errno), errno); --- > fd, strerror(err), err); 5266a5867,5912 > static int > nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) > { > nxt_unit_port_impl_t *port_impl; > > port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); > > rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf); > > return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; > } > > > static int > nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) > { > uint32_t cookie; > nxt_port_msg_t *port_msg; > nxt_app_queue_t *queue; > nxt_unit_port_impl_t *port_impl; > > port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); > queue = port_impl->queue; > > retry: > > rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie); > > nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size); > > if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) { > port_msg = (nxt_port_msg_t *) rbuf->buf; > > if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { > return NXT_UNIT_OK; > } > > nxt_unit_debug(NULL, "app_queue_recv: message cancelled"); > > goto retry; > } > > return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; > } > > 5395,5396c6041,6042 < nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, < nxt_unit_request_info_impl_t *req_impl) --- > nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, > nxt_unit_request_info_t *req) 5398,5400c6044,6048 < uint32_t *stream; < nxt_int_t res; < nxt_lvlhsh_query_t lhq; --- > uint32_t *stream; > nxt_int_t res; > nxt_lvlhsh_query_t lhq; > nxt_unit_ctx_impl_t *ctx_impl; > nxt_unit_request_info_impl_t *req_impl; 5401a6050,6054 > req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); > if (req_impl->in_hash) { > return NXT_UNIT_OK; > } > 5412c6065 < res = nxt_lvlhsh_insert(request_hash, &lhq); --- > ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5413a6067,6072 > pthread_mutex_lock(&ctx_impl->mutex); > > res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq); > > pthread_mutex_unlock(&ctx_impl->mutex); > 5416a6076 > req_impl->in_hash = 1; 5425,5427c6085,6086 < static nxt_unit_request_info_impl_t * < nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, < int remove) --- > static nxt_unit_request_info_t * > nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) 5429,5430c6088,6091 < nxt_int_t res; < nxt_lvlhsh_query_t lhq; --- > nxt_int_t res; > nxt_lvlhsh_query_t lhq; > nxt_unit_ctx_impl_t *ctx_impl; > nxt_unit_request_info_impl_t *req_impl; 5437a6099,6102 > ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); > > pthread_mutex_lock(&ctx_impl->mutex); > 5439c6104 < res = nxt_lvlhsh_delete(request_hash, &lhq); --- > res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq); 5442c6107 < res = nxt_lvlhsh_find(request_hash, &lhq); --- > res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq); 5444a6110,6111 > pthread_mutex_unlock(&ctx_impl->mutex); > 5447a6115,6118 > req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t, > req); > req_impl->in_hash = 0; >