nxt_unit.c (1555:1d84b9e4b459) nxt_unit.c (1556:2f09c86168d9)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"

--- 156 unchanged lines hidden (view full) ---

165static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
166 nxt_unit_read_buf_t *rbuf);
167static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
168 nxt_unit_read_buf_t *rbuf);
169static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
170 nxt_unit_read_buf_t *rbuf);
171static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
172 nxt_unit_read_buf_t *rbuf);
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"

--- 156 unchanged lines hidden (view full) ---

165static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
166 nxt_unit_read_buf_t *rbuf);
167static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
168 nxt_unit_read_buf_t *rbuf);
169static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
170 nxt_unit_read_buf_t *rbuf);
171static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
172 nxt_unit_read_buf_t *rbuf);
173nxt_inline int nxt_unit_close(int fd);
173
174static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
175 nxt_unit_port_t *port);
176static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
177 nxt_unit_port_id_t *port_id, int remove);
178
179static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
180 nxt_unit_request_info_t *req);

--- 304 unchanged lines hidden (view full) ---

485 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
486 nxt_unit_alert(NULL, "failed to send READY message");
487
488 munmap(mem, sizeof(nxt_port_queue_t));
489
490 goto fail;
491 }
492
174
175static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
176 nxt_unit_port_t *port);
177static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
178 nxt_unit_port_id_t *port_id, int remove);
179
180static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
181 nxt_unit_request_info_t *req);

--- 304 unchanged lines hidden (view full) ---

486 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
487 nxt_unit_alert(NULL, "failed to send READY message");
488
489 munmap(mem, sizeof(nxt_port_queue_t));
490
491 goto fail;
492 }
493
493 close(ready_port.out_fd);
494 close(queue_fd);
494 nxt_unit_close(ready_port.out_fd);
495 nxt_unit_close(queue_fd);
495
496 return ctx;
497
498fail:
499
500 if (queue_fd != -1) {
496
497 return ctx;
498
499fail:
500
501 if (queue_fd != -1) {
501 close(queue_fd);
502 nxt_unit_close(queue_fd);
502 }
503
504 nxt_unit_ctx_release(&lib->main_ctx.ctx);
505
506 return NULL;
507}
508
509

--- 523 unchanged lines hidden (view full) ---

1033 port_msg->stream, (int) port_msg->type);
1034
1035 goto fail;
1036 }
1037
1038fail:
1039
1040 if (recv_msg.fd != -1) {
503 }
504
505 nxt_unit_ctx_release(&lib->main_ctx.ctx);
506
507 return NULL;
508}
509
510

--- 523 unchanged lines hidden (view full) ---

1034 port_msg->stream, (int) port_msg->type);
1035
1036 goto fail;
1037 }
1038
1039fail:
1040
1041 if (recv_msg.fd != -1) {
1041 close(recv_msg.fd);
1042 nxt_unit_close(recv_msg.fd);
1042 }
1043
1044 if (recv_msg.fd2 != -1) {
1043 }
1044
1045 if (recv_msg.fd2 != -1) {
1045 close(recv_msg.fd2);
1046 nxt_unit_close(recv_msg.fd2);
1046 }
1047
1048 while (recv_msg.incoming_buf != NULL) {
1049 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1050 }
1051
1052 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1053#if (NXT_DEBUG)

--- 612 unchanged lines hidden (view full) ---

1666 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1667 }
1668
1669 while (req_impl->incoming_buf != NULL) {
1670 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1671 }
1672
1673 if (req->content_fd != -1) {
1047 }
1048
1049 while (recv_msg.incoming_buf != NULL) {
1050 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1051 }
1052
1053 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1054#if (NXT_DEBUG)

--- 612 unchanged lines hidden (view full) ---

1667 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1668 }
1669
1670 while (req_impl->incoming_buf != NULL) {
1671 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1672 }
1673
1674 if (req->content_fd != -1) {
1674 close(req->content_fd);
1675 nxt_unit_close(req->content_fd);
1675
1676 req->content_fd = -1;
1677 }
1678
1679 if (req->response_port != NULL) {
1680 nxt_unit_port_release(req->response_port);
1681
1682 req->response_port = NULL;

--- 1223 unchanged lines hidden (view full) ---

2906 if (nxt_slow_path(res < 0)) {
2907 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2908 strerror(errno), errno);
2909
2910 return res;
2911 }
2912
2913 if (res < (ssize_t) size) {
1676
1677 req->content_fd = -1;
1678 }
1679
1680 if (req->response_port != NULL) {
1681 nxt_unit_port_release(req->response_port);
1682
1683 req->response_port = NULL;

--- 1223 unchanged lines hidden (view full) ---

2907 if (nxt_slow_path(res < 0)) {
2908 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2909 strerror(errno), errno);
2910
2911 return res;
2912 }
2913
2914 if (res < (ssize_t) size) {
2914 close(req->content_fd);
2915 nxt_unit_close(req->content_fd);
2915
2916 req->content_fd = -1;
2917 }
2918
2919 req->content_length -= res;
2920 size -= res;
2921
2922 dst = nxt_pointer_to(dst, res);

--- 95 unchanged lines hidden (view full) ---

3018 strerror(errno), errno);
3019
3020 nxt_unit_mmap_buf_free(mmap_buf);
3021
3022 return NULL;
3023 }
3024
3025 if (res < (ssize_t) size) {
2916
2917 req->content_fd = -1;
2918 }
2919
2920 req->content_length -= res;
2921 size -= res;
2922
2923 dst = nxt_pointer_to(dst, res);

--- 95 unchanged lines hidden (view full) ---

3019 strerror(errno), errno);
3020
3021 nxt_unit_mmap_buf_free(mmap_buf);
3022
3023 return NULL;
3024 }
3025
3026 if (res < (ssize_t) size) {
3026 close(req->content_fd);
3027 nxt_unit_close(req->content_fd);
3027
3028 req->content_fd = -1;
3029 }
3030
3031 nxt_unit_req_debug(req, "preread: read %d", (int) res);
3032
3033 mmap_buf->buf.end = mmap_buf->buf.free + res;
3034

--- 541 unchanged lines hidden (view full) ---

3576 goto remove_fail;
3577 }
3578
3579 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3580 if (nxt_slow_path(mem == MAP_FAILED)) {
3581 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3582 strerror(errno), errno);
3583
3028
3029 req->content_fd = -1;
3030 }
3031
3032 nxt_unit_req_debug(req, "preread: read %d", (int) res);
3033
3034 mmap_buf->buf.end = mmap_buf->buf.free + res;
3035

--- 541 unchanged lines hidden (view full) ---

3577 goto remove_fail;
3578 }
3579
3580 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3581 if (nxt_slow_path(mem == MAP_FAILED)) {
3582 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3583 strerror(errno), errno);
3584
3584 close(fd);
3585 nxt_unit_close(fd);
3585
3586 goto remove_fail;
3587 }
3588
3589 mm->hdr = mem;
3590 hdr = mem;
3591
3592 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));

--- 20 unchanged lines hidden (view full) ---

3613 munmap(mem, PORT_MMAP_SIZE);
3614 hdr = NULL;
3615
3616 } else {
3617 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3618 hdr->id, (int) lib->pid, (int) port->id.pid);
3619 }
3620
3586
3587 goto remove_fail;
3588 }
3589
3590 mm->hdr = mem;
3591 hdr = mem;
3592
3593 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));

--- 20 unchanged lines hidden (view full) ---

3614 munmap(mem, PORT_MMAP_SIZE);
3615 hdr = NULL;
3616
3617 } else {
3618 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3619 hdr->id, (int) lib->pid, (int) port->id.pid);
3620 }
3621
3621 close(fd);
3622 nxt_unit_close(fd);
3622
3623 pthread_mutex_lock(&lib->outgoing.mutex);
3624
3625 if (nxt_fast_path(hdr != NULL)) {
3626 return hdr;
3627 }
3628
3629remove_fail:

--- 64 unchanged lines hidden (view full) ---

3694#error No working shared memory implementation.
3695
3696#endif
3697
3698 if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3699 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3700 strerror(errno), errno);
3701
3623
3624 pthread_mutex_lock(&lib->outgoing.mutex);
3625
3626 if (nxt_fast_path(hdr != NULL)) {
3627 return hdr;
3628 }
3629
3630remove_fail:

--- 64 unchanged lines hidden (view full) ---

3695#error No working shared memory implementation.
3696
3697#endif
3698
3699 if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3700 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3701 strerror(errno), errno);
3702
3702 close(fd);
3703 nxt_unit_close(fd);
3703
3704 return -1;
3705 }
3706
3707 return fd;
3708}
3709
3710

--- 1194 unchanged lines hidden (view full) ---

4905 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
4906 port_impl->queue = mem;
4907
4908 rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
4909 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4910 goto fail;
4911 }
4912
3704
3705 return -1;
3706 }
3707
3708 return fd;
3709}
3710
3711

--- 1194 unchanged lines hidden (view full) ---

4906 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
4907 port_impl->queue = mem;
4908
4909 rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
4910 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4911 goto fail;
4912 }
4913
4913 close(queue_fd);
4914 nxt_unit_close(queue_fd);
4914
4915 return &new_ctx->ctx;
4916
4917fail:
4918
4919 if (queue_fd != -1) {
4915
4916 return &new_ctx->ctx;
4917
4918fail:
4919
4920 if (queue_fd != -1) {
4920 close(queue_fd);
4921 nxt_unit_close(queue_fd);
4921 }
4922
4923 nxt_unit_ctx_release(&new_ctx->ctx);
4924
4925 return NULL;
4926}
4927
4928

--- 100 unchanged lines hidden (view full) ---

5029 port_sockets[0], port_sockets[1]);
5030
5031 pthread_mutex_lock(&lib->mutex);
5032
5033 process = nxt_unit_process_get(lib, lib->pid);
5034 if (nxt_slow_path(process == NULL)) {
5035 pthread_mutex_unlock(&lib->mutex);
5036
4922 }
4923
4924 nxt_unit_ctx_release(&new_ctx->ctx);
4925
4926 return NULL;
4927}
4928
4929

--- 100 unchanged lines hidden (view full) ---

5030 port_sockets[0], port_sockets[1]);
5031
5032 pthread_mutex_lock(&lib->mutex);
5033
5034 process = nxt_unit_process_get(lib, lib->pid);
5035 if (nxt_slow_path(process == NULL)) {
5036 pthread_mutex_unlock(&lib->mutex);
5037
5037 close(port_sockets[0]);
5038 close(port_sockets[1]);
5038 nxt_unit_close(port_sockets[0]);
5039 nxt_unit_close(port_sockets[1]);
5039
5040 return NULL;
5041 }
5042
5043 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
5044
5045 new_port.in_fd = port_sockets[0];
5046 new_port.out_fd = port_sockets[1];
5047 new_port.data = NULL;
5048
5049 pthread_mutex_unlock(&lib->mutex);
5050
5051 nxt_unit_process_release(process);
5052
5053 port = nxt_unit_add_port(ctx, &new_port, NULL);
5054 if (nxt_slow_path(port == NULL)) {
5040
5041 return NULL;
5042 }
5043
5044 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
5045
5046 new_port.in_fd = port_sockets[0];
5047 new_port.out_fd = port_sockets[1];
5048 new_port.data = NULL;
5049
5050 pthread_mutex_unlock(&lib->mutex);
5051
5052 nxt_unit_process_release(process);
5053
5054 port = nxt_unit_add_port(ctx, &new_port, NULL);
5055 if (nxt_slow_path(port == NULL)) {
5055 close(port_sockets[0]);
5056 close(port_sockets[1]);
5056 nxt_unit_close(port_sockets[0]);
5057 nxt_unit_close(port_sockets[1]);
5057 }
5058
5059 return port;
5060}
5061
5062
5063static int
5064nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,

--- 69 unchanged lines hidden (view full) ---

5134 long c;
5135 nxt_unit_port_impl_t *port_impl;
5136
5137 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5138
5139 c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5140
5141 if (c == 1) {
5058 }
5059
5060 return port;
5061}
5062
5063
5064static int
5065nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,

--- 69 unchanged lines hidden (view full) ---

5135 long c;
5136 nxt_unit_port_impl_t *port_impl;
5137
5138 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5139
5140 c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5141
5142 if (c == 1) {
5142 nxt_unit_debug(NULL, "destroy port{%d,%d}",
5143 (int) port->id.pid, (int) port->id.id);
5143 nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d",
5144 (int) port->id.pid, (int) port->id.id,
5145 port->in_fd, port->out_fd);
5144
5145 nxt_unit_process_release(port_impl->process);
5146
5147 if (port->in_fd != -1) {
5146
5147 nxt_unit_process_release(port_impl->process);
5148
5149 if (port->in_fd != -1) {
5148 close(port->in_fd);
5150 nxt_unit_close(port->in_fd);
5149
5150 port->in_fd = -1;
5151 }
5152
5153 if (port->out_fd != -1) {
5151
5152 port->in_fd = -1;
5153 }
5154
5155 if (port->out_fd != -1) {
5154 close(port->out_fd);
5156 nxt_unit_close(port->out_fd);
5155
5156 port->out_fd = -1;
5157 }
5158
5157
5158 port->out_fd = -1;
5159 }
5160
5159 if (port->in_fd != -1) {
5160 close(port->in_fd);
5161
5162 port->in_fd = -1;
5163 }
5164
5165 if (port->out_fd != -1) {
5166 close(port->out_fd);
5167
5168 port->out_fd = -1;
5169 }
5170
5171 if (port_impl->queue != NULL) {
5172 munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
5173 ? sizeof(nxt_app_queue_t)
5174 : sizeof(nxt_port_queue_t));
5175 }
5176
5177 free(port_impl);
5178 }

--- 30 unchanged lines hidden (view full) ---

5209 }
5210
5211 if (old_port->in_fd == -1) {
5212 old_port->in_fd = port->in_fd;
5213 port->in_fd = -1;
5214 }
5215
5216 if (port->in_fd != -1) {
5161 if (port_impl->queue != NULL) {
5162 munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
5163 ? sizeof(nxt_app_queue_t)
5164 : sizeof(nxt_port_queue_t));
5165 }
5166
5167 free(port_impl);
5168 }

--- 30 unchanged lines hidden (view full) ---

5199 }
5200
5201 if (old_port->in_fd == -1) {
5202 old_port->in_fd = port->in_fd;
5203 port->in_fd = -1;
5204 }
5205
5206 if (port->in_fd != -1) {
5217 close(port->in_fd);
5207 nxt_unit_close(port->in_fd);
5218 port->in_fd = -1;
5219 }
5220
5221 if (old_port->out_fd == -1) {
5222 old_port->out_fd = port->out_fd;
5223 port->out_fd = -1;
5224 }
5225
5226 if (port->out_fd != -1) {
5208 port->in_fd = -1;
5209 }
5210
5211 if (old_port->out_fd == -1) {
5212 old_port->out_fd = port->out_fd;
5213 port->out_fd = -1;
5214 }
5215
5216 if (port->out_fd != -1) {
5227 close(port->out_fd);
5217 nxt_unit_close(port->out_fd);
5228 port->out_fd = -1;
5229 }
5230
5231 *port = *old_port;
5232
5233 nxt_queue_init(&awaiting_req);
5234
5235 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);

--- 669 unchanged lines hidden (view full) ---

5905
5906 goto retry;
5907 }
5908
5909 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
5910}
5911
5912
5218 port->out_fd = -1;
5219 }
5220
5221 *port = *old_port;
5222
5223 nxt_queue_init(&awaiting_req);
5224
5225 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);

--- 669 unchanged lines hidden (view full) ---

5895
5896 goto retry;
5897 }
5898
5899 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
5900}
5901
5902
5903nxt_inline int
5904nxt_unit_close(int fd)
5905{
5906 int res;
5907
5908 res = close(fd);
5909
5910 if (nxt_slow_path(res == -1)) {
5911 nxt_unit_alert(NULL, "close(%d) failed: %s (%d)",
5912 fd, strerror(errno), errno);
5913
5914 } else {
5915 nxt_unit_debug(NULL, "close(%d): %d", fd, res);
5916 }
5917
5918 return res;
5919}
5920
5921
5913static nxt_int_t
5914nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
5915{
5916 nxt_unit_port_t *port;
5917 nxt_unit_port_hash_id_t *port_id;
5918
5919 port = data;
5920 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;

--- 371 unchanged lines hidden ---
5922static nxt_int_t
5923nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
5924{
5925 nxt_unit_port_t *port;
5926 nxt_unit_port_hash_id_t *port_id;
5927
5928 port = data;
5929 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;

--- 371 unchanged lines hidden ---