Deleted Added
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"

--- 156 unchanged lines hidden (view full) ---

165static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
166 nxt_unit_read_buf_t *rbuf);
167static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
168 nxt_unit_read_buf_t *rbuf);
169static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
170 nxt_unit_read_buf_t *rbuf);
171static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
172 nxt_unit_read_buf_t *rbuf);
173nxt_inline int nxt_unit_close(int fd);
174
175static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
176 nxt_unit_port_t *port);
177static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
178 nxt_unit_port_id_t *port_id, int remove);
179
180static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
181 nxt_unit_request_info_t *req);

--- 304 unchanged lines hidden (view full) ---

486 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
487 nxt_unit_alert(NULL, "failed to send READY message");
488
489 munmap(mem, sizeof(nxt_port_queue_t));
490
491 goto fail;
492 }
493
494 nxt_unit_close(ready_port.out_fd);
495 nxt_unit_close(queue_fd);
496
497 return ctx;
498
499fail:
500
501 if (queue_fd != -1) {
502 nxt_unit_close(queue_fd);
503 }
504
505 nxt_unit_ctx_release(&lib->main_ctx.ctx);
506
507 return NULL;
508}
509
510

--- 523 unchanged lines hidden (view full) ---

1034 port_msg->stream, (int) port_msg->type);
1035
1036 goto fail;
1037 }
1038
1039fail:
1040
1041 if (recv_msg.fd != -1) {
1042 nxt_unit_close(recv_msg.fd);
1043 }
1044
1045 if (recv_msg.fd2 != -1) {
1046 nxt_unit_close(recv_msg.fd2);
1047 }
1048
1049 while (recv_msg.incoming_buf != NULL) {
1050 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1051 }
1052
1053 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1054#if (NXT_DEBUG)

--- 612 unchanged lines hidden (view full) ---

1667 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1668 }
1669
1670 while (req_impl->incoming_buf != NULL) {
1671 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1672 }
1673
1674 if (req->content_fd != -1) {
1675 nxt_unit_close(req->content_fd);
1676
1677 req->content_fd = -1;
1678 }
1679
1680 if (req->response_port != NULL) {
1681 nxt_unit_port_release(req->response_port);
1682
1683 req->response_port = NULL;

--- 1223 unchanged lines hidden (view full) ---

2907 if (nxt_slow_path(res < 0)) {
2908 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2909 strerror(errno), errno);
2910
2911 return res;
2912 }
2913
2914 if (res < (ssize_t) size) {
2915 nxt_unit_close(req->content_fd);
2916
2917 req->content_fd = -1;
2918 }
2919
2920 req->content_length -= res;
2921 size -= res;
2922
2923 dst = nxt_pointer_to(dst, res);

--- 95 unchanged lines hidden (view full) ---

3019 strerror(errno), errno);
3020
3021 nxt_unit_mmap_buf_free(mmap_buf);
3022
3023 return NULL;
3024 }
3025
3026 if (res < (ssize_t) size) {
3027 nxt_unit_close(req->content_fd);
3028
3029 req->content_fd = -1;
3030 }
3031
3032 nxt_unit_req_debug(req, "preread: read %d", (int) res);
3033
3034 mmap_buf->buf.end = mmap_buf->buf.free + res;
3035

--- 541 unchanged lines hidden (view full) ---

3577 goto remove_fail;
3578 }
3579
3580 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3581 if (nxt_slow_path(mem == MAP_FAILED)) {
3582 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3583 strerror(errno), errno);
3584
3585 nxt_unit_close(fd);
3586
3587 goto remove_fail;
3588 }
3589
3590 mm->hdr = mem;
3591 hdr = mem;
3592
3593 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));

--- 20 unchanged lines hidden (view full) ---

3614 munmap(mem, PORT_MMAP_SIZE);
3615 hdr = NULL;
3616
3617 } else {
3618 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3619 hdr->id, (int) lib->pid, (int) port->id.pid);
3620 }
3621
3622 nxt_unit_close(fd);
3623
3624 pthread_mutex_lock(&lib->outgoing.mutex);
3625
3626 if (nxt_fast_path(hdr != NULL)) {
3627 return hdr;
3628 }
3629
3630remove_fail:

--- 64 unchanged lines hidden (view full) ---

3695#error No working shared memory implementation.
3696
3697#endif
3698
3699 if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3700 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3701 strerror(errno), errno);
3702
3703 nxt_unit_close(fd);
3704
3705 return -1;
3706 }
3707
3708 return fd;
3709}
3710
3711

--- 1194 unchanged lines hidden (view full) ---

4906 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
4907 port_impl->queue = mem;
4908
4909 rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
4910 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4911 goto fail;
4912 }
4913
4914 nxt_unit_close(queue_fd);
4915
4916 return &new_ctx->ctx;
4917
4918fail:
4919
4920 if (queue_fd != -1) {
4921 nxt_unit_close(queue_fd);
4922 }
4923
4924 nxt_unit_ctx_release(&new_ctx->ctx);
4925
4926 return NULL;
4927}
4928
4929

--- 100 unchanged lines hidden (view full) ---

5030 port_sockets[0], port_sockets[1]);
5031
5032 pthread_mutex_lock(&lib->mutex);
5033
5034 process = nxt_unit_process_get(lib, lib->pid);
5035 if (nxt_slow_path(process == NULL)) {
5036 pthread_mutex_unlock(&lib->mutex);
5037
5038 nxt_unit_close(port_sockets[0]);
5039 nxt_unit_close(port_sockets[1]);
5040
5041 return NULL;
5042 }
5043
5044 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
5045
5046 new_port.in_fd = port_sockets[0];
5047 new_port.out_fd = port_sockets[1];
5048 new_port.data = NULL;
5049
5050 pthread_mutex_unlock(&lib->mutex);
5051
5052 nxt_unit_process_release(process);
5053
5054 port = nxt_unit_add_port(ctx, &new_port, NULL);
5055 if (nxt_slow_path(port == NULL)) {
5056 nxt_unit_close(port_sockets[0]);
5057 nxt_unit_close(port_sockets[1]);
5058 }
5059
5060 return port;
5061}
5062
5063
5064static int
5065nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,

--- 69 unchanged lines hidden (view full) ---

5135 long c;
5136 nxt_unit_port_impl_t *port_impl;
5137
5138 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5139
5140 c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5141
5142 if (c == 1) {
5143 nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d",
5144 (int) port->id.pid, (int) port->id.id,
5145 port->in_fd, port->out_fd);
5146
5147 nxt_unit_process_release(port_impl->process);
5148
5149 if (port->in_fd != -1) {
5150 nxt_unit_close(port->in_fd);
5151
5152 port->in_fd = -1;
5153 }
5154
5155 if (port->out_fd != -1) {
5156 nxt_unit_close(port->out_fd);
5157
5158 port->out_fd = -1;
5159 }
5160
5161 if (port_impl->queue != NULL) {
5162 munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
5163 ? sizeof(nxt_app_queue_t)
5164 : sizeof(nxt_port_queue_t));
5165 }
5166
5167 free(port_impl);
5168 }

--- 30 unchanged lines hidden (view full) ---

5199 }
5200
5201 if (old_port->in_fd == -1) {
5202 old_port->in_fd = port->in_fd;
5203 port->in_fd = -1;
5204 }
5205
5206 if (port->in_fd != -1) {
5207 nxt_unit_close(port->in_fd);
5208 port->in_fd = -1;
5209 }
5210
5211 if (old_port->out_fd == -1) {
5212 old_port->out_fd = port->out_fd;
5213 port->out_fd = -1;
5214 }
5215
5216 if (port->out_fd != -1) {
5217 nxt_unit_close(port->out_fd);
5218 port->out_fd = -1;
5219 }
5220
5221 *port = *old_port;
5222
5223 nxt_queue_init(&awaiting_req);
5224
5225 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);

--- 669 unchanged lines hidden (view full) ---

5895
5896 goto retry;
5897 }
5898
5899 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
5900}
5901
5902
5903nxt_inline int
5904nxt_unit_close(int fd)
5905{
5906 int res;
5907
5908 res = close(fd);
5909
5910 if (nxt_slow_path(res == -1)) {
5911 nxt_unit_alert(NULL, "close(%d) failed: %s (%d)",
5912 fd, strerror(errno), errno);
5913
5914 } else {
5915 nxt_unit_debug(NULL, "close(%d): %d", fd, res);
5916 }
5917
5918 return res;
5919}
5920
5921
5922static nxt_int_t
5923nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
5924{
5925 nxt_unit_port_t *port;
5926 nxt_unit_port_hash_id_t *port_id;
5927
5928 port = data;
5929 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;

--- 371 unchanged lines hidden ---