Deleted
Added
nxt_unit.c (1008:84f2370bd642) | nxt_unit.c (1131:ec7d924d8dfb) |
---|---|
1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include <stdlib.h> 7 8#include "nxt_main.h" 9#include "nxt_port_memory_int.h" 10 11#include "nxt_unit.h" 12#include "nxt_unit_request.h" 13#include "nxt_unit_response.h" | 1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include <stdlib.h> 7 8#include "nxt_main.h" 9#include "nxt_port_memory_int.h" 10 11#include "nxt_unit.h" 12#include "nxt_unit_request.h" 13#include "nxt_unit_response.h" |
14#include "nxt_unit_websocket.h" |
|
14 | 15 |
16#include "nxt_websocket.h" 17 |
|
15#if (NXT_HAVE_MEMFD_CREATE) 16#include <linux/memfd.h> 17#endif 18 | 18#if (NXT_HAVE_MEMFD_CREATE) 19#include <linux/memfd.h> 20#endif 21 |
19typedef struct nxt_unit_impl_s nxt_unit_impl_t; 20typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 21typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 22typedef struct nxt_unit_process_s nxt_unit_process_t; 23typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 24typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 25typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 26typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 27typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; | 22typedef struct nxt_unit_impl_s nxt_unit_impl_t; 23typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 24typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 25typedef struct nxt_unit_process_s nxt_unit_process_t; 26typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 27typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 28typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 29typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 30typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 31typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; |
28 29static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 30static void nxt_unit_ctx_init(nxt_unit_impl_t *lib, 31 nxt_unit_ctx_impl_t *ctx_impl, void *data); | 32 33static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 34static void nxt_unit_ctx_init(nxt_unit_impl_t *lib, 35 nxt_unit_ctx_impl_t *ctx_impl, void *data); |
36nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 37 nxt_unit_mmap_buf_t *mmap_buf); 38nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 39 nxt_unit_mmap_buf_t *mmap_buf); 40nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf); |
|
32static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 33 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream); 34static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 35 uint32_t stream); | 41static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 42 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream); 43static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 44 uint32_t stream); |
45static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 46 nxt_unit_recv_msg_t *recv_msg); 47static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 48 nxt_unit_recv_msg_t *recv_msg); 49static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 50 nxt_unit_recv_msg_t *recv_msg); |
|
36static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 37 nxt_unit_ctx_t *ctx); 38static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 39static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); | 51static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 52 nxt_unit_ctx_t *ctx); 53static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 54static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); |
55static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 56 nxt_unit_ctx_t *ctx); 57static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 58static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); |
|
40static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, 41 nxt_unit_recv_msg_t *recv_msg); 42static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 43static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 44static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 45 nxt_unit_mmap_buf_t *mmap_buf, int last); | 59static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, 60 nxt_unit_recv_msg_t *recv_msg); 61static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 62static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 63static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 64 nxt_unit_mmap_buf_t *mmap_buf, int last); |
65static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 66static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 67 size_t size); |
|
46static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 47 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, 48 nxt_chunk_id_t *c, int n); 49static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 50static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 51 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); 52static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 53 int fd); --- 6 unchanged lines hidden (view full) --- 60static void nxt_unit_process_use(nxt_unit_ctx_t *ctx, 61 nxt_unit_process_t *process, int i); 62static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 63static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, 64 nxt_unit_process_t *process, uint32_t id); 65static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, 66 nxt_unit_recv_msg_t *recv_msg); 67static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, | 68static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 69 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, 70 nxt_chunk_id_t *c, int n); 71static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 72static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 73 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); 74static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 75 int fd); --- 6 unchanged lines hidden (view full) --- 82static void nxt_unit_process_use(nxt_unit_ctx_t *ctx, 83 nxt_unit_process_t *process, int i); 84static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 85static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, 86 nxt_unit_process_t *process, uint32_t id); 87static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, 88 nxt_unit_recv_msg_t *recv_msg); 89static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, |
68 nxt_unit_recv_msg_t *recv_msg, nxt_queue_t *incoming_buf); | 90 nxt_unit_recv_msg_t *recv_msg); |
69static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, 70 uint32_t size); 71 72static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, 73 pid_t pid); 74static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, 75 pid_t pid, int remove); 76static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); --- 16 unchanged lines hidden (view full) --- 93 nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, 94 void *oob, size_t oob_size); 95 96static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 97 nxt_unit_port_t *port); 98static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 99 nxt_unit_port_id_t *port_id, int remove); 100 | 91static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, 92 uint32_t size); 93 94static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, 95 pid_t pid); 96static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, 97 pid_t pid, int remove); 98static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); --- 16 unchanged lines hidden (view full) --- 115 nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, 116 void *oob, size_t oob_size); 117 118static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 119 nxt_unit_port_t *port); 120static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 121 nxt_unit_port_id_t *port_id, int remove); 122 |
123static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 124 nxt_unit_request_info_impl_t *req_impl); 125static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( 126 nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); 127 |
|
101static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 102 103 104struct nxt_unit_mmap_buf_s { 105 nxt_unit_buf_t buf; 106 | 128static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 129 130 131struct nxt_unit_mmap_buf_s { 132 nxt_unit_buf_t buf; 133 |
134 nxt_unit_mmap_buf_t *next; 135 nxt_unit_mmap_buf_t **prev; 136 |
|
107 nxt_port_mmap_header_t *hdr; | 137 nxt_port_mmap_header_t *hdr; |
108 nxt_queue_link_t link; | 138// nxt_queue_link_t link; |
109 nxt_unit_port_id_t port_id; 110 nxt_unit_request_info_t *req; 111 nxt_unit_ctx_impl_t *ctx_impl; 112}; 113 114 115struct nxt_unit_recv_msg_s { | 139 nxt_unit_port_id_t port_id; 140 nxt_unit_request_info_t *req; 141 nxt_unit_ctx_impl_t *ctx_impl; 142}; 143 144 145struct nxt_unit_recv_msg_s { |
116 nxt_port_msg_t port_msg; | 146 uint32_t stream; 147 nxt_pid_t pid; 148 nxt_port_id_t reply_port; |
117 | 149 |
150 uint8_t last; /* 1 bit */ 151 uint8_t mmap; /* 1 bit */ 152 |
|
118 void *start; 119 uint32_t size; 120 | 153 void *start; 154 uint32_t size; 155 |
156 int fd; |
|
121 nxt_unit_process_t *process; | 157 nxt_unit_process_t *process; |
158 159 nxt_unit_mmap_buf_t *incoming_buf; |
|
122}; 123 124 125typedef enum { 126 NXT_UNIT_RS_START = 0, 127 NXT_UNIT_RS_RESPONSE_INIT, 128 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 129 NXT_UNIT_RS_RESPONSE_SENT, | 160}; 161 162 163typedef enum { 164 NXT_UNIT_RS_START = 0, 165 NXT_UNIT_RS_RESPONSE_INIT, 166 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 167 NXT_UNIT_RS_RESPONSE_SENT, |
130 NXT_UNIT_RS_DONE, | 168 NXT_UNIT_RS_RELEASED, |
131} nxt_unit_req_state_t; 132 133 134struct nxt_unit_request_info_impl_s { 135 nxt_unit_request_info_t req; 136 | 169} nxt_unit_req_state_t; 170 171 172struct nxt_unit_request_info_impl_s { 173 nxt_unit_request_info_t req; 174 |
137 nxt_unit_recv_msg_t recv_msg; 138 nxt_queue_t outgoing_buf; /* of nxt_unit_mmap_buf_t */ 139 nxt_queue_t incoming_buf; /* of nxt_unit_mmap_buf_t */ | 175 uint32_t stream; |
140 | 176 |
177 nxt_unit_process_t *process; 178 179 nxt_unit_mmap_buf_t *outgoing_buf; 180 nxt_unit_mmap_buf_t *incoming_buf; 181 |
|
141 nxt_unit_req_state_t state; | 182 nxt_unit_req_state_t state; |
183 uint8_t websocket; |
|
142 143 nxt_queue_link_t link; 144 145 char extra_data[]; 146}; 147 148 | 184 185 nxt_queue_link_t link; 186 187 char extra_data[]; 188}; 189 190 |
191struct nxt_unit_websocket_frame_impl_s { 192 nxt_unit_websocket_frame_t ws; 193 194 nxt_unit_mmap_buf_t *buf; 195 196 nxt_queue_link_t link; 197 198 nxt_unit_ctx_impl_t *ctx_impl; 199 200 void *retain_buf; 201}; 202 203 |
|
149struct nxt_unit_ctx_impl_s { 150 nxt_unit_ctx_t ctx; 151 152 nxt_unit_port_id_t read_port_id; 153 int read_port_fd; 154 155 nxt_queue_link_t link; 156 | 204struct nxt_unit_ctx_impl_s { 205 nxt_unit_ctx_t ctx; 206 207 nxt_unit_port_id_t read_port_id; 208 int read_port_fd; 209 210 nxt_queue_link_t link; 211 |
157 nxt_queue_t free_buf; /* of nxt_unit_mmap_buf_t */ | 212 nxt_unit_mmap_buf_t *free_buf; |
158 159 /* of nxt_unit_request_info_impl_t */ 160 nxt_queue_t free_req; 161 | 213 214 /* of nxt_unit_request_info_impl_t */ 215 nxt_queue_t free_req; 216 |
217 /* of nxt_unit_websocket_frame_impl_t */ 218 nxt_queue_t free_ws; 219 |
|
162 /* of nxt_unit_request_info_impl_t */ 163 nxt_queue_t active_req; 164 | 220 /* of nxt_unit_request_info_impl_t */ 221 nxt_queue_t active_req; 222 |
223 /* of nxt_unit_request_info_impl_t */ 224 nxt_lvlhsh_t requests; 225 |
|
165 nxt_unit_mmap_buf_t ctx_buf[2]; 166 167 nxt_unit_request_info_impl_t req; 168}; 169 170 171struct nxt_unit_impl_s { 172 nxt_unit_t unit; --- 216 unchanged lines hidden (view full) --- 389nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 390 void *data) 391{ 392 ctx_impl->ctx.data = data; 393 ctx_impl->ctx.unit = &lib->unit; 394 395 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 396 | 226 nxt_unit_mmap_buf_t ctx_buf[2]; 227 228 nxt_unit_request_info_impl_t req; 229}; 230 231 232struct nxt_unit_impl_s { 233 nxt_unit_t unit; --- 216 unchanged lines hidden (view full) --- 450nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 451 void *data) 452{ 453 ctx_impl->ctx.data = data; 454 ctx_impl->ctx.unit = &lib->unit; 455 456 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 457 |
397 nxt_queue_init(&ctx_impl->free_buf); | |
398 nxt_queue_init(&ctx_impl->free_req); | 458 nxt_queue_init(&ctx_impl->free_req); |
459 nxt_queue_init(&ctx_impl->free_ws); |
|
399 nxt_queue_init(&ctx_impl->active_req); 400 | 460 nxt_queue_init(&ctx_impl->active_req); 461 |
401 nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0].link); 402 nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1].link); | 462 ctx_impl->free_buf = NULL; 463 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 464 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 465 |
403 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 404 405 ctx_impl->req.req.ctx = &ctx_impl->ctx; 406 ctx_impl->req.req.unit = &lib->unit; 407 408 ctx_impl->read_port_fd = -1; | 466 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 467 468 ctx_impl->req.req.ctx = &ctx_impl->ctx; 469 ctx_impl->req.req.unit = &lib->unit; 470 471 ctx_impl->read_port_fd = -1; |
472 ctx_impl->requests.slot = 0; |
|
409} 410 411 | 473} 474 475 |
476nxt_inline void 477nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 478 nxt_unit_mmap_buf_t *mmap_buf) 479{ 480 mmap_buf->next = *head; 481 482 if (mmap_buf->next != NULL) { 483 mmap_buf->next->prev = &mmap_buf->next; 484 } 485 486 *head = mmap_buf; 487 mmap_buf->prev = head; 488} 489 490 491nxt_inline void 492nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 493 nxt_unit_mmap_buf_t *mmap_buf) 494{ 495 while (*prev != NULL) { 496 prev = &(*prev)->next; 497 } 498 499 nxt_unit_mmap_buf_insert(prev, mmap_buf); 500} 501 502 503nxt_inline void 504nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf) 505{ 506 nxt_unit_mmap_buf_t **prev; 507 508 prev = mmap_buf->prev; 509 510 if (mmap_buf->next != NULL) { 511 mmap_buf->next->prev = prev; 512 } 513 514 if (prev != NULL) { 515 *prev = mmap_buf->next; 516 } 517} 518 519 |
|
412static int 413nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, 414 int *log_fd, uint32_t *stream) 415{ 416 int rc; 417 int ready_fd, read_fd; 418 char *unit_init, *version_end; 419 long version_length; --- 84 unchanged lines hidden (view full) --- 504 return NXT_UNIT_OK; 505} 506 507 508int 509nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 510 void *buf, size_t buf_size, void *oob, size_t oob_size) 511{ | 520static int 521nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, 522 int *log_fd, uint32_t *stream) 523{ 524 int rc; 525 int ready_fd, read_fd; 526 char *unit_init, *version_end; 527 long version_length; --- 84 unchanged lines hidden (view full) --- 612 return NXT_UNIT_OK; 613} 614 615 616int 617nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 618 void *buf, size_t buf_size, void *oob, size_t oob_size) 619{ |
512 int fd, rc, nb; 513 pid_t pid; 514 nxt_queue_t incoming_buf; 515 struct cmsghdr *cm; 516 nxt_port_msg_t *port_msg; 517 nxt_unit_impl_t *lib; 518 nxt_unit_port_t new_port; 519 nxt_queue_link_t *lnk; 520 nxt_unit_request_t *r; 521 nxt_unit_mmap_buf_t *b; 522 nxt_unit_recv_msg_t recv_msg; 523 nxt_unit_callbacks_t *cb; 524 nxt_port_msg_new_port_t *new_port_msg; 525 nxt_unit_request_info_t *req; 526 nxt_unit_request_info_impl_t *req_impl; | 620 int rc; 621 pid_t pid; 622 struct cmsghdr *cm; 623 nxt_port_msg_t *port_msg; 624 nxt_unit_impl_t *lib; 625 nxt_unit_recv_msg_t recv_msg; 626 nxt_unit_callbacks_t *cb; |
527 528 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 529 530 rc = NXT_UNIT_ERROR; | 627 628 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 629 630 rc = NXT_UNIT_ERROR; |
531 fd = -1; | 631 recv_msg.fd = -1; |
532 recv_msg.process = NULL; 533 port_msg = buf; 534 cm = oob; 535 536 if (oob_size >= CMSG_SPACE(sizeof(int)) 537 && cm->cmsg_len == CMSG_LEN(sizeof(int)) 538 && cm->cmsg_level == SOL_SOCKET 539 && cm->cmsg_type == SCM_RIGHTS) 540 { | 632 recv_msg.process = NULL; 633 port_msg = buf; 634 cm = oob; 635 636 if (oob_size >= CMSG_SPACE(sizeof(int)) 637 && cm->cmsg_len == CMSG_LEN(sizeof(int)) 638 && cm->cmsg_level == SOL_SOCKET 639 && cm->cmsg_type == SCM_RIGHTS) 640 { |
541 memcpy(&fd, CMSG_DATA(cm), sizeof(int)); | 641 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); |
542 } 543 | 642 } 643 |
544 nxt_queue_init(&incoming_buf); | 644 recv_msg.incoming_buf = NULL; |
545 546 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { 547 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); 548 goto fail; 549 } 550 | 645 646 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { 647 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); 648 goto fail; 649 } 650 |
551 recv_msg.port_msg = *port_msg; | 651 recv_msg.stream = port_msg->stream; 652 recv_msg.pid = port_msg->pid; 653 recv_msg.reply_port = port_msg->reply_port; 654 recv_msg.last = port_msg->last; 655 recv_msg.mmap = port_msg->mmap; 656 |
552 recv_msg.start = port_msg + 1; 553 recv_msg.size = buf_size - sizeof(nxt_port_msg_t); 554 555 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 556 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", 557 port_msg->stream, (int) port_msg->type); 558 goto fail; 559 } --- 7 unchanged lines hidden (view full) --- 567 /* Fragmentation is unsupported. */ 568 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 569 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", 570 port_msg->stream, (int) port_msg->type); 571 goto fail; 572 } 573 574 if (port_msg->mmap) { | 657 recv_msg.start = port_msg + 1; 658 recv_msg.size = buf_size - sizeof(nxt_port_msg_t); 659 660 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 661 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", 662 port_msg->stream, (int) port_msg->type); 663 goto fail; 664 } --- 7 unchanged lines hidden (view full) --- 672 /* Fragmentation is unsupported. */ 673 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 674 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", 675 port_msg->stream, (int) port_msg->type); 676 goto fail; 677 } 678 679 if (port_msg->mmap) { |
575 if (nxt_unit_mmap_read(ctx, &recv_msg, &incoming_buf) != NXT_UNIT_OK) { | 680 if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { |
576 goto fail; 577 } 578 } 579 580 cb = &lib->callbacks; 581 582 switch (port_msg->type) { 583 584 case _NXT_PORT_MSG_QUIT: 585 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 586 587 cb->quit(ctx); 588 rc = NXT_UNIT_OK; 589 break; 590 591 case _NXT_PORT_MSG_NEW_PORT: | 681 goto fail; 682 } 683 } 684 685 cb = &lib->callbacks; 686 687 switch (port_msg->type) { 688 689 case _NXT_PORT_MSG_QUIT: 690 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 691 692 cb->quit(ctx); 693 rc = NXT_UNIT_OK; 694 break; 695 696 case _NXT_PORT_MSG_NEW_PORT: |
592 if (nxt_slow_path(recv_msg.size != sizeof(nxt_port_msg_new_port_t))) { 593 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 594 "invalid message size (%d)", 595 port_msg->stream, (int) recv_msg.size); | 697 rc = nxt_unit_process_new_port(ctx, &recv_msg); 698 break; |
596 | 699 |
597 goto fail; 598 } | 700 case _NXT_PORT_MSG_CHANGE_FILE: 701 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 702 port_msg->stream, recv_msg.fd); 703 break; |
599 | 704 |
600 if (nxt_slow_path(fd < 0)) { 601 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 602 port_msg->stream, fd); | 705 case _NXT_PORT_MSG_MMAP: 706 if (nxt_slow_path(recv_msg.fd < 0)) { 707 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 708 port_msg->stream, recv_msg.fd); |
603 604 goto fail; 605 } 606 | 709 710 goto fail; 711 } 712 |
607 new_port_msg = recv_msg.start; | 713 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); 714 break; |
608 | 715 |
609 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", 610 port_msg->stream, (int) new_port_msg->pid, 611 (int) new_port_msg->id, fd); | 716 case _NXT_PORT_MSG_REQ_HEADERS: 717 rc = nxt_unit_process_req_headers(ctx, &recv_msg); 718 break; |
612 | 719 |
613 nb = 0; | 720 case _NXT_PORT_MSG_WEBSOCKET: 721 rc = nxt_unit_process_websocket(ctx, &recv_msg); 722 break; |
614 | 723 |
615 if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) { 616 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " 617 "failed: %s (%d)", fd, strerror(errno), errno); | 724 case _NXT_PORT_MSG_REMOVE_PID: 725 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 726 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " 727 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 728 (int) sizeof(pid)); |
618 619 goto fail; 620 } 621 | 729 730 goto fail; 731 } 732 |
622 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 623 new_port_msg->id); | 733 memcpy(&pid, recv_msg.start, sizeof(pid)); |
624 | 734 |
625 new_port.in_fd = -1; 626 new_port.out_fd = fd; 627 new_port.data = NULL; | 735 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 736 port_msg->stream, (int) pid); |
628 | 737 |
629 fd = -1; | 738 cb->remove_pid(ctx, pid); |
630 | 739 |
631 rc = cb->add_port(ctx, &new_port); | 740 rc = NXT_UNIT_OK; |
632 break; 633 | 741 break; 742 |
634 case _NXT_PORT_MSG_CHANGE_FILE: 635 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 636 port_msg->stream, fd); 637 break; | 743 default: 744 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", 745 port_msg->stream, (int) port_msg->type); |
638 | 746 |
639 case _NXT_PORT_MSG_MMAP: 640 if (nxt_slow_path(fd < 0)) { 641 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 642 port_msg->stream, fd); | 747 goto fail; 748 } |
643 | 749 |
644 goto fail; 645 } | 750fail: |
646 | 751 |
647 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, fd); 648 break; | 752 if (recv_msg.fd != -1) { 753 close(recv_msg.fd); 754 } |
649 | 755 |
650 case _NXT_PORT_MSG_DATA: 651 if (nxt_slow_path(port_msg->mmap == 0)) { 652 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 653 port_msg->stream); | 756 while (recv_msg.incoming_buf != NULL) { 757 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 758 } |
654 | 759 |
655 goto fail; 656 } | 760 if (recv_msg.process != NULL) { 761 nxt_unit_process_use(ctx, recv_msg.process, -1); 762 } |
657 | 763 |
658 if (nxt_slow_path(recv_msg.size < sizeof(nxt_unit_request_t))) { 659 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 660 "%d expected", port_msg->stream, (int) recv_msg.size, 661 (int) sizeof(nxt_unit_request_t)); | 764 return rc; 765} |
662 | 766 |
663 goto fail; 664 } | |
665 | 767 |
666 req_impl = nxt_unit_request_info_get(ctx); 667 if (nxt_slow_path(req_impl == NULL)) { 668 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 669 port_msg->stream); | 768static int 769nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 770{ 771 int nb; 772 nxt_unit_impl_t *lib; 773 nxt_unit_port_t new_port; 774 nxt_port_msg_new_port_t *new_port_msg; |
670 | 775 |
671 goto fail; 672 } | 776 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 777 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 778 "invalid message size (%d)", 779 recv_msg->stream, (int) recv_msg->size); |
673 | 780 |
674 req = &req_impl->req; | 781 return NXT_UNIT_ERROR; 782 } |
675 | 783 |
676 req->request_port = *port_id; | 784 if (nxt_slow_path(recv_msg->fd < 0)) { 785 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 786 recv_msg->stream, recv_msg->fd); |
677 | 787 |
678 nxt_unit_port_id_init(&req->response_port, port_msg->pid, 679 port_msg->reply_port); | 788 return NXT_UNIT_ERROR; 789 } |
680 | 790 |
681 req->request = recv_msg.start; | 791 new_port_msg = recv_msg->start; |
682 | 792 |
683 lnk = nxt_queue_first(&incoming_buf); 684 b = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); | 793 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", 794 recv_msg->stream, (int) new_port_msg->pid, 795 (int) new_port_msg->id, recv_msg->fd); |
685 | 796 |
686 req->request_buf = &b->buf; 687 req->response = NULL; 688 req->response_buf = NULL; | 797 nb = 0; |
689 | 798 |
690 r = req->request; | 799 if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { 800 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " 801 "failed: %s (%d)", recv_msg->fd, strerror(errno), errno); |
691 | 802 |
692 req->content_length = r->content_length; | 803 return NXT_UNIT_ERROR; 804 } |
693 | 805 |
694 req->content_buf = req->request_buf; 695 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); | 806 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 807 new_port_msg->id); |
696 | 808 |
697 /* Move process to req_impl. */ 698 req_impl->recv_msg = recv_msg; | 809 new_port.in_fd = -1; 810 new_port.out_fd = recv_msg->fd; 811 new_port.data = NULL; |
699 | 812 |
700 recv_msg.process = NULL; | 813 recv_msg->fd = -1; |
701 | 814 |
702 nxt_queue_init(&req_impl->outgoing_buf); 703 nxt_queue_init(&req_impl->incoming_buf); | 815 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); |
704 | 816 |
705 nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link) 706 { 707 b->req = req; 708 } nxt_queue_loop; | 817 return lib->callbacks.add_port(ctx, &new_port); 818} |
709 | 819 |
710 nxt_queue_add(&req_impl->incoming_buf, &incoming_buf); 711 nxt_queue_init(&incoming_buf); | |
712 | 820 |
713 req->response_max_fields = 0; 714 req_impl->state = NXT_UNIT_RS_START; | 821static int 822nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 823{ 824 nxt_unit_impl_t *lib; 825 nxt_unit_request_t *r; 826 nxt_unit_mmap_buf_t *b; 827 nxt_unit_request_info_t *req; 828 nxt_unit_request_info_impl_t *req_impl; |
715 | 829 |
716 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", port_msg->stream, 717 (int) r->method_length, nxt_unit_sptr_get(&r->method), 718 (int) r->target_length, nxt_unit_sptr_get(&r->target), 719 (int) r->content_length); | 830 if (nxt_slow_path(recv_msg->mmap == 0)) { 831 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 832 recv_msg->stream); |
720 | 833 |
721 cb->request_handler(req); | 834 return NXT_UNIT_ERROR; 835 } |
722 | 836 |
723 rc = NXT_UNIT_OK; 724 break; | 837 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 838 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 839 "%d expected", recv_msg->stream, (int) recv_msg->size, 840 (int) sizeof(nxt_unit_request_t)); |
725 | 841 |
726 case _NXT_PORT_MSG_REMOVE_PID: 727 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 728 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " 729 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 730 (int) sizeof(pid)); | 842 return NXT_UNIT_ERROR; 843 } |
731 | 844 |
732 goto fail; 733 } | 845 req_impl = nxt_unit_request_info_get(ctx); 846 if (nxt_slow_path(req_impl == NULL)) { 847 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 848 recv_msg->stream); |
734 | 849 |
735 memcpy(&pid, recv_msg.start, sizeof(pid)); | 850 return NXT_UNIT_ERROR; 851 } |
736 | 852 |
737 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 738 port_msg->stream, (int) pid); | 853 req = &req_impl->req; |
739 | 854 |
740 cb->remove_pid(ctx, pid); | 855 nxt_unit_port_id_init(&req->response_port, recv_msg->pid, 856 recv_msg->reply_port); |
741 | 857 |
742 rc = NXT_UNIT_OK; 743 break; | 858 req->request = recv_msg->start; |
744 | 859 |
745 default: 746 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", 747 port_msg->stream, (int) port_msg->type); | 860 b = recv_msg->incoming_buf; |
748 | 861 |
749 goto fail; | 862 req->request_buf = &b->buf; 863 req->response = NULL; 864 req->response_buf = NULL; 865 866 r = req->request; 867 868 req->content_length = r->content_length; 869 870 req->content_buf = req->request_buf; 871 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 872 873 /* "Move" process reference to req_impl. */ 874 req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg); 875 if (nxt_slow_path(req_impl->process == NULL)) { 876 return NXT_UNIT_ERROR; |
750 } 751 | 877 } 878 |
752fail: | 879 recv_msg->process = NULL; |
753 | 880 |
754 if (fd != -1) { 755 close(fd); | 881 req_impl->stream = recv_msg->stream; 882 883 req_impl->outgoing_buf = NULL; 884 885 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 886 b->req = req; |
756 } 757 | 887 } 888 |
758 if (port_msg->mmap) { 759 nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link) 760 { 761 nxt_unit_mmap_release(b->hdr, b->buf.start, 762 b->buf.end - b->buf.start); | 889 /* "Move" incoming buffer list to req_impl. */ 890 req_impl->incoming_buf = recv_msg->incoming_buf; 891 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 892 recv_msg->incoming_buf = NULL; |
763 | 893 |
764 nxt_unit_mmap_buf_release(b); 765 } nxt_queue_loop; | 894 req->response_max_fields = 0; 895 req_impl->state = NXT_UNIT_RS_START; 896 req_impl->websocket = 0; 897 898 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 899 (int) r->method_length, nxt_unit_sptr_get(&r->method), 900 (int) r->target_length, nxt_unit_sptr_get(&r->target), 901 (int) r->content_length); 902 903 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 904 905 lib->callbacks.request_handler(req); 906 907 return NXT_UNIT_OK; 908} 909 910 911static int 912nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 913{ 914 size_t hsize; 915 nxt_unit_impl_t *lib; 916 nxt_unit_mmap_buf_t *b; 917 nxt_unit_ctx_impl_t *ctx_impl; 918 nxt_unit_callbacks_t *cb; 919 nxt_unit_request_info_t *req; 920 nxt_unit_request_info_impl_t *req_impl; 921 nxt_unit_websocket_frame_impl_t *ws_impl; 922 923 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 924 925 req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, 926 recv_msg->last); 927 if (req_impl == NULL) { 928 return NXT_UNIT_OK; |
766 } 767 | 929 } 930 |
768 if (recv_msg.process != NULL) { 769 nxt_unit_process_use(ctx, recv_msg.process, -1); | 931 req = &req_impl->req; 932 933 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 934 cb = &lib->callbacks; 935 936 if (cb->websocket_handler && recv_msg->size >= 2) { 937 ws_impl = nxt_unit_websocket_frame_get(ctx); 938 if (nxt_slow_path(ws_impl == NULL)) { 939 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 940 req_impl->stream); 941 942 return NXT_UNIT_ERROR; 943 } 944 945 ws_impl->ws.req = req; 946 947 ws_impl->buf = NULL; 948 ws_impl->retain_buf = NULL; 949 950 if (recv_msg->mmap) { 951 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 952 b->req = req; 953 } 954 955 /* "Move" incoming buffer list to ws_impl. */ 956 ws_impl->buf = recv_msg->incoming_buf; 957 ws_impl->buf->prev = &ws_impl->buf; 958 recv_msg->incoming_buf = NULL; 959 960 b = ws_impl->buf; 961 962 } else { 963 b = nxt_unit_mmap_buf_get(ctx); 964 if (nxt_slow_path(b == NULL)) { 965 return NXT_UNIT_ERROR; 966 } 967 968 b->hdr = NULL; 969 b->req = req; 970 b->buf.start = recv_msg->start; 971 b->buf.free = b->buf.start; 972 b->buf.end = b->buf.start + recv_msg->size; 973 974 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 975 } 976 977 ws_impl->ws.header = (void *) b->buf.start; 978 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 979 ws_impl->ws.header); 980 981 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 982 983 if (ws_impl->ws.header->mask) { 984 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 985 986 } else { 987 ws_impl->ws.mask = NULL; 988 } 989 990 b->buf.free += hsize; 991 992 ws_impl->ws.content_buf = &b->buf; 993 ws_impl->ws.content_length = ws_impl->ws.payload_len; 994 995 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 996 "payload_len=%"PRIu64, 997 ws_impl->ws.header->opcode, 998 ws_impl->ws.payload_len); 999 1000 cb->websocket_handler(&ws_impl->ws); |
770 } 771 | 1001 } 1002 |
772 return rc; | 1003 if (recv_msg->last) { 1004 req_impl->websocket = 0; 1005 1006 if (cb->close_handler) { 1007 nxt_unit_req_debug(req, "close_handler"); 1008 1009 cb->close_handler(req); 1010 1011 } else { 1012 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1013 } 1014 } 1015 1016 return NXT_UNIT_OK; |
773} 774 775 776static nxt_unit_request_info_impl_t * 777nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 778{ 779 nxt_unit_impl_t *lib; 780 nxt_queue_link_t *lnk; --- 29 unchanged lines hidden (view full) --- 810 811 return req_impl; 812} 813 814 815static void 816nxt_unit_request_info_release(nxt_unit_request_info_t *req) 817{ | 1017} 1018 1019 1020static nxt_unit_request_info_impl_t * 1021nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1022{ 1023 nxt_unit_impl_t *lib; 1024 nxt_queue_link_t *lnk; --- 29 unchanged lines hidden (view full) --- 1054 1055 return req_impl; 1056} 1057 1058 1059static void 1060nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1061{ |
818 nxt_unit_mmap_buf_t *b; | |
819 nxt_unit_ctx_impl_t *ctx_impl; | 1062 nxt_unit_ctx_impl_t *ctx_impl; |
820 nxt_unit_recv_msg_t *recv_msg; | |
821 nxt_unit_request_info_impl_t *req_impl; 822 823 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 824 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 825 826 req->response = NULL; 827 req->response_buf = NULL; 828 | 1063 nxt_unit_request_info_impl_t *req_impl; 1064 1065 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1066 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1067 1068 req->response = NULL; 1069 req->response_buf = NULL; 1070 |
829 recv_msg = &req_impl->recv_msg; | 1071 if (req_impl->process != NULL) { 1072 nxt_unit_process_use(req->ctx, req_impl->process, -1); |
830 | 1073 |
831 if (recv_msg->process != NULL) { 832 nxt_unit_process_use(req->ctx, recv_msg->process, -1); 833 834 recv_msg->process = NULL; | 1074 req_impl->process = NULL; |
835 } 836 | 1075 } 1076 |
837 nxt_queue_each(b, &req_impl->outgoing_buf, nxt_unit_mmap_buf_t, link) { | 1077 if (req_impl->websocket) { 1078 nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); |
838 | 1079 |
839 nxt_unit_buf_free(&b->buf); | 1080 req_impl->websocket = 0; 1081 } |
840 | 1082 |
841 } nxt_queue_loop; | 1083 while (req_impl->outgoing_buf != NULL) { 1084 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1085 } |
842 | 1086 |
843 nxt_queue_each(b, &req_impl->incoming_buf, nxt_unit_mmap_buf_t, link) { | 1087 while (req_impl->incoming_buf != NULL) { 1088 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1089 } |
844 | 1090 |
845 nxt_unit_mmap_release(b->hdr, b->buf.start, b->buf.end - b->buf.start); 846 nxt_unit_mmap_buf_release(b); 847 848 } nxt_queue_loop; 849 | |
850 nxt_queue_remove(&req_impl->link); 851 852 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); | 1091 nxt_queue_remove(&req_impl->link); 1092 1093 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); |
1094 1095 req_impl->state = NXT_UNIT_RS_RELEASED; |
|
853} 854 855 856static void 857nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 858{ 859 nxt_unit_ctx_impl_t *ctx_impl; 860 861 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 862 863 nxt_queue_remove(&req_impl->link); 864 865 if (req_impl != &ctx_impl->req) { 866 free(req_impl); 867 } 868} 869 870 | 1096} 1097 1098 1099static void 1100nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1101{ 1102 nxt_unit_ctx_impl_t *ctx_impl; 1103 1104 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1105 1106 nxt_queue_remove(&req_impl->link); 1107 1108 if (req_impl != &ctx_impl->req) { 1109 free(req_impl); 1110 } 1111} 1112 1113 |
1114static nxt_unit_websocket_frame_impl_t * 1115nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1116{ 1117 nxt_queue_link_t *lnk; 1118 nxt_unit_ctx_impl_t *ctx_impl; 1119 nxt_unit_websocket_frame_impl_t *ws_impl; 1120 1121 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1122 1123 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1124 ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); 1125 if (nxt_slow_path(ws_impl == NULL)) { 1126 nxt_unit_warn(ctx, "websocket frame allocation failed"); 1127 1128 return NULL; 1129 } 1130 1131 } else { 1132 lnk = nxt_queue_first(&ctx_impl->free_ws); 1133 nxt_queue_remove(lnk); 1134 1135 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1136 } 1137 1138 ws_impl->ctx_impl = ctx_impl; 1139 1140 return ws_impl; 1141} 1142 1143 1144static void 1145nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1146{ 1147 nxt_unit_websocket_frame_impl_t *ws_impl; 1148 1149 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1150 1151 while (ws_impl->buf != NULL) { 1152 nxt_unit_mmap_buf_free(ws_impl->buf); 1153 } 1154 1155 ws->req = NULL; 1156 1157 if (ws_impl->retain_buf != NULL) { 1158 free(ws_impl->retain_buf); 1159 1160 ws_impl->retain_buf = NULL; 1161 } 1162 1163 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1164} 1165 1166 1167static void 1168nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl) 1169{ 1170 nxt_queue_remove(&ws_impl->link); 1171 1172 free(ws_impl); 1173} 1174 1175 |
|
871uint16_t 872nxt_unit_field_hash(const char *name, size_t name_length) 873{ 874 u_char ch; 875 uint32_t hash; 876 const char *p, *end; 877 878 hash = 159406; /* Magic value copied from nxt_http_parse.c */ --- 391 unchanged lines hidden (view full) --- 1270 } 1271 1272 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1273 nxt_unit_req_warn(req, "send: response already sent"); 1274 1275 return NXT_UNIT_ERROR; 1276 } 1277 | 1176uint16_t 1177nxt_unit_field_hash(const char *name, size_t name_length) 1178{ 1179 u_char ch; 1180 uint32_t hash; 1181 const char *p, *end; 1182 1183 hash = 159406; /* Magic value copied from nxt_http_parse.c */ --- 391 unchanged lines hidden (view full) --- 1575 } 1576 1577 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1578 nxt_unit_req_warn(req, "send: response already sent"); 1579 1580 return NXT_UNIT_ERROR; 1581 } 1582 |
1583 if (req->request->websocket_handshake && req->response->status == 101) { 1584 nxt_unit_response_upgrade(req); 1585 } 1586 |
|
1278 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 1279 req->response->fields_count, 1280 (int) (req->response_buf->free 1281 - req->response_buf->start)); 1282 1283 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 1284 | 1587 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 1588 req->response->fields_count, 1589 (int) (req->response_buf->free 1590 - req->response_buf->start)); 1591 1592 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 1593 |
1285 rc = nxt_unit_mmap_buf_send(req->ctx, 1286 req_impl->recv_msg.port_msg.stream, 1287 mmap_buf, 0); | 1594 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); |
1288 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 1289 req->response = NULL; 1290 req->response_buf = NULL; 1291 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 1292 1293 nxt_unit_mmap_buf_release(mmap_buf); 1294 } 1295 --- 11 unchanged lines hidden (view full) --- 1307 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 1308} 1309 1310 1311nxt_unit_buf_t * 1312nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 1313{ 1314 int rc; | 1595 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 1596 req->response = NULL; 1597 req->response_buf = NULL; 1598 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 1599 1600 nxt_unit_mmap_buf_release(mmap_buf); 1601 } 1602 --- 11 unchanged lines hidden (view full) --- 1614 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 1615} 1616 1617 1618nxt_unit_buf_t * 1619nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 1620{ 1621 int rc; |
1315 nxt_unit_process_t *process; | |
1316 nxt_unit_mmap_buf_t *mmap_buf; 1317 nxt_unit_request_info_impl_t *req_impl; 1318 1319 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 1320 nxt_unit_req_warn(req, "response_buf_alloc: " 1321 "requested buffer (%"PRIu32") too big", size); 1322 1323 return NULL; 1324 } 1325 1326 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 1327 1328 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1329 | 1622 nxt_unit_mmap_buf_t *mmap_buf; 1623 nxt_unit_request_info_impl_t *req_impl; 1624 1625 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 1626 nxt_unit_req_warn(req, "response_buf_alloc: " 1627 "requested buffer (%"PRIu32") too big", size); 1628 1629 return NULL; 1630 } 1631 1632 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 1633 1634 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1635 |
1330 process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg); 1331 if (nxt_slow_path(process == NULL)) { 1332 return NULL; 1333 } 1334 | |
1335 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 1336 if (nxt_slow_path(mmap_buf == NULL)) { 1337 return NULL; 1338 } 1339 1340 mmap_buf->req = req; 1341 | 1636 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 1637 if (nxt_slow_path(mmap_buf == NULL)) { 1638 return NULL; 1639 } 1640 1641 mmap_buf->req = req; 1642 |
1342 nxt_queue_insert_tail(&req_impl->outgoing_buf, &mmap_buf->link); | 1643 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); |
1343 | 1644 |
1344 rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port, 1345 size, mmap_buf); | 1645 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 1646 &req->response_port, size, mmap_buf); |
1346 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1347 nxt_unit_mmap_buf_release(mmap_buf); 1348 1349 return NULL; 1350 } 1351 1352 return &mmap_buf->buf; 1353} --- 7 unchanged lines hidden (view full) --- 1361 if (recv_msg->process != NULL) { 1362 return recv_msg->process; 1363 } 1364 1365 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1366 1367 pthread_mutex_lock(&lib->mutex); 1368 | 1647 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1648 nxt_unit_mmap_buf_release(mmap_buf); 1649 1650 return NULL; 1651 } 1652 1653 return &mmap_buf->buf; 1654} --- 7 unchanged lines hidden (view full) --- 1662 if (recv_msg->process != NULL) { 1663 return recv_msg->process; 1664 } 1665 1666 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1667 1668 pthread_mutex_lock(&lib->mutex); 1669 |
1369 recv_msg->process = nxt_unit_process_find(ctx, recv_msg->port_msg.pid, 0); | 1670 recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0); |
1370 1371 pthread_mutex_unlock(&lib->mutex); 1372 1373 if (recv_msg->process == NULL) { 1374 nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", | 1671 1672 pthread_mutex_unlock(&lib->mutex); 1673 1674 if (recv_msg->process == NULL) { 1675 nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", |
1375 recv_msg->port_msg.stream, (int) recv_msg->port_msg.pid); | 1676 recv_msg->stream, (int) recv_msg->pid); |
1376 } 1377 1378 return recv_msg->process; 1379} 1380 1381 1382static nxt_unit_mmap_buf_t * 1383nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 1384{ | 1677 } 1678 1679 return recv_msg->process; 1680} 1681 1682 1683static nxt_unit_mmap_buf_t * 1684nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 1685{ |
1385 nxt_queue_link_t *lnk; | |
1386 nxt_unit_mmap_buf_t *mmap_buf; 1387 nxt_unit_ctx_impl_t *ctx_impl; 1388 1389 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1390 | 1686 nxt_unit_mmap_buf_t *mmap_buf; 1687 nxt_unit_ctx_impl_t *ctx_impl; 1688 1689 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1690 |
1391 if (nxt_queue_is_empty(&ctx_impl->free_buf)) { | 1691 if (ctx_impl->free_buf == NULL) { |
1392 mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); 1393 if (nxt_slow_path(mmap_buf == NULL)) { 1394 nxt_unit_warn(ctx, "failed to allocate buf"); 1395 } 1396 1397 } else { | 1692 mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); 1693 if (nxt_slow_path(mmap_buf == NULL)) { 1694 nxt_unit_warn(ctx, "failed to allocate buf"); 1695 } 1696 1697 } else { |
1398 lnk = nxt_queue_first(&ctx_impl->free_buf); 1399 nxt_queue_remove(lnk); | 1698 mmap_buf = ctx_impl->free_buf; |
1400 | 1699 |
1401 mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); | 1700 nxt_unit_mmap_buf_remove(mmap_buf); |
1402 } 1403 1404 mmap_buf->ctx_impl = ctx_impl; 1405 1406 return mmap_buf; 1407} 1408 1409 1410static void 1411nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 1412{ | 1701 } 1702 1703 mmap_buf->ctx_impl = ctx_impl; 1704 1705 return mmap_buf; 1706} 1707 1708 1709static void 1710nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 1711{ |
1413 nxt_queue_remove(&mmap_buf->link); | 1712 nxt_unit_mmap_buf_remove(mmap_buf); |
1414 | 1713 |
1415 nxt_queue_insert_tail(&mmap_buf->ctx_impl->free_buf, &mmap_buf->link); | 1714 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); |
1416} 1417 1418 | 1715} 1716 1717 |
1718typedef struct { 1719 size_t len; 1720 const char *str; 1721} nxt_unit_str_t; 1722 1723 1724#define nxt_unit_str(str) { nxt_length(str), str } 1725 1726 |
|
1419int | 1727int |
1728nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 1729{ 1730 return req->request->websocket_handshake; 1731} 1732 1733 1734int 1735nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 1736{ 1737 int rc; 1738 nxt_unit_ctx_impl_t *ctx_impl; 1739 nxt_unit_request_info_impl_t *req_impl; 1740 1741 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1742 1743 if (nxt_slow_path(req_impl->websocket != 0)) { 1744 nxt_unit_req_debug(req, "upgrade: already upgraded"); 1745 1746 return NXT_UNIT_OK; 1747 } 1748 1749 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1750 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 1751 1752 return NXT_UNIT_ERROR; 1753 } 1754 1755 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1756 nxt_unit_req_warn(req, "upgrade: response already sent"); 1757 1758 return NXT_UNIT_ERROR; 1759 } 1760 1761 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1762 1763 rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); 1764 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1765 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 1766 1767 return NXT_UNIT_ERROR; 1768 } 1769 1770 req_impl->websocket = 1; 1771 1772 req->response->status = 101; 1773 1774 return NXT_UNIT_OK; 1775} 1776 1777 1778int 1779nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 1780{ 1781 nxt_unit_request_info_impl_t *req_impl; 1782 1783 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1784 1785 return req_impl->websocket; 1786} 1787 1788 1789nxt_unit_request_info_t * 1790nxt_unit_get_request_info_from_data(void *data) 1791{ 1792 nxt_unit_request_info_impl_t *req_impl; 1793 1794 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 1795 1796 return &req_impl->req; 1797} 1798 1799 1800int |
|
1420nxt_unit_buf_send(nxt_unit_buf_t *buf) 1421{ 1422 int rc; 1423 nxt_unit_mmap_buf_t *mmap_buf; 1424 nxt_unit_request_info_t *req; 1425 nxt_unit_request_info_impl_t *req_impl; 1426 1427 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); --- 12 unchanged lines hidden (view full) --- 1440 1441 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 1442 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 1443 1444 return NXT_UNIT_ERROR; 1445 } 1446 1447 if (nxt_fast_path(buf->free > buf->start)) { | 1801nxt_unit_buf_send(nxt_unit_buf_t *buf) 1802{ 1803 int rc; 1804 nxt_unit_mmap_buf_t *mmap_buf; 1805 nxt_unit_request_info_t *req; 1806 nxt_unit_request_info_impl_t *req_impl; 1807 1808 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); --- 12 unchanged lines hidden (view full) --- 1821 1822 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 1823 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 1824 1825 return NXT_UNIT_ERROR; 1826 } 1827 1828 if (nxt_fast_path(buf->free > buf->start)) { |
1448 rc = nxt_unit_mmap_buf_send(req->ctx, 1449 req_impl->recv_msg.port_msg.stream, 1450 mmap_buf, 0); | 1829 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); |
1451 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1452 return rc; 1453 } 1454 } 1455 1456 nxt_unit_mmap_buf_release(mmap_buf); 1457 1458 return NXT_UNIT_OK; --- 8 unchanged lines hidden (view full) --- 1467 nxt_unit_request_info_t *req; 1468 nxt_unit_request_info_impl_t *req_impl; 1469 1470 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 1471 1472 req = mmap_buf->req; 1473 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1474 | 1830 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1831 return rc; 1832 } 1833 } 1834 1835 nxt_unit_mmap_buf_release(mmap_buf); 1836 1837 return NXT_UNIT_OK; --- 8 unchanged lines hidden (view full) --- 1846 nxt_unit_request_info_t *req; 1847 nxt_unit_request_info_impl_t *req_impl; 1848 1849 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 1850 1851 req = mmap_buf->req; 1852 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1853 |
1475 rc = nxt_unit_mmap_buf_send(req->ctx, 1476 req_impl->recv_msg.port_msg.stream, 1477 mmap_buf, 1); 1478 | 1854 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); |
1479 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 1480 nxt_unit_mmap_buf_release(mmap_buf); 1481 1482 nxt_unit_request_info_release(req); 1483 1484 } else { 1485 nxt_unit_request_done(req, rc); 1486 } --- 14 unchanged lines hidden (view full) --- 1501 nxt_chunk_id_t first_free_chunk; 1502 nxt_unit_buf_t *buf; 1503 nxt_unit_impl_t *lib; 1504 nxt_port_mmap_header_t *hdr; 1505 1506 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1507 1508 buf = &mmap_buf->buf; | 1855 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 1856 nxt_unit_mmap_buf_release(mmap_buf); 1857 1858 nxt_unit_request_info_release(req); 1859 1860 } else { 1861 nxt_unit_request_done(req, rc); 1862 } --- 14 unchanged lines hidden (view full) --- 1877 nxt_chunk_id_t first_free_chunk; 1878 nxt_unit_buf_t *buf; 1879 nxt_unit_impl_t *lib; 1880 nxt_port_mmap_header_t *hdr; 1881 1882 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1883 1884 buf = &mmap_buf->buf; |
1885 hdr = mmap_buf->hdr; |
|
1509 1510 m.mmap_msg.size = buf->free - buf->start; 1511 1512 m.msg.stream = stream; 1513 m.msg.pid = lib->pid; 1514 m.msg.reply_port = 0; 1515 m.msg.type = _NXT_PORT_MSG_DATA; 1516 m.msg.last = last != 0; | 1886 1887 m.mmap_msg.size = buf->free - buf->start; 1888 1889 m.msg.stream = stream; 1890 m.msg.pid = lib->pid; 1891 m.msg.reply_port = 0; 1892 m.msg.type = _NXT_PORT_MSG_DATA; 1893 m.msg.last = last != 0; |
1517 m.msg.mmap = m.mmap_msg.size > 0; | 1894 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; |
1518 m.msg.nf = 0; 1519 m.msg.mf = 0; 1520 m.msg.tracking = 0; 1521 | 1895 m.msg.nf = 0; 1896 m.msg.mf = 0; 1897 m.msg.tracking = 0; 1898 |
1522 hdr = mmap_buf->hdr; | 1899 if (hdr != NULL) { 1900 m.mmap_msg.mmap_id = hdr->id; 1901 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); 1902 } |
1523 | 1903 |
1524 m.mmap_msg.mmap_id = hdr->id; 1525 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); 1526 | |
1527 nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 1528 stream, 1529 (int) m.mmap_msg.mmap_id, 1530 (int) m.mmap_msg.chunk_id, 1531 (int) m.mmap_msg.size); 1532 1533 res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, | 1904 nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 1905 stream, 1906 (int) m.mmap_msg.mmap_id, 1907 (int) m.mmap_msg.chunk_id, 1908 (int) m.mmap_msg.size); 1909 1910 res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, |
1534 m.mmap_msg.size > 0 ? sizeof(m) 1535 : sizeof(m.msg), | 1911 m.msg.mmap ? sizeof(m) : sizeof(m.msg), |
1536 NULL, 0); 1537 if (nxt_slow_path(res != sizeof(m))) { 1538 return NXT_UNIT_ERROR; 1539 } 1540 | 1912 NULL, 0); 1913 if (nxt_slow_path(res != sizeof(m))) { 1914 return NXT_UNIT_ERROR; 1915 } 1916 |
1541 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { | 1917 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) { |
1542 last_used = (u_char *) buf->free - 1; 1543 1544 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 1545 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 1546 end = (u_char *) buf->end; 1547 1548 nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free)); 1549 1550 buf->end = (char *) first_free; 1551 } 1552 1553 return NXT_UNIT_OK; 1554} 1555 1556 1557void 1558nxt_unit_buf_free(nxt_unit_buf_t *buf) 1559{ | 1918 last_used = (u_char *) buf->free - 1; 1919 1920 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 1921 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 1922 end = (u_char *) buf->end; 1923 1924 nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free)); 1925 1926 buf->end = (char *) first_free; 1927 } 1928 1929 return NXT_UNIT_OK; 1930} 1931 1932 1933void 1934nxt_unit_buf_free(nxt_unit_buf_t *buf) 1935{ |
1560 nxt_unit_mmap_buf_t *mmap_buf; | 1936 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 1937} |
1561 | 1938 |
1562 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); | |
1563 | 1939 |
1564 nxt_unit_mmap_release(mmap_buf->hdr, buf->start, buf->end - buf->start); | 1940static void 1941nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 1942{ 1943 if (nxt_fast_path(mmap_buf->hdr != NULL)) { 1944 nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start, 1945 mmap_buf->buf.end - mmap_buf->buf.start); 1946 } |
1565 1566 nxt_unit_mmap_buf_release(mmap_buf); 1567} 1568 1569 1570nxt_unit_buf_t * 1571nxt_unit_buf_next(nxt_unit_buf_t *buf) 1572{ | 1947 1948 nxt_unit_mmap_buf_release(mmap_buf); 1949} 1950 1951 1952nxt_unit_buf_t * 1953nxt_unit_buf_next(nxt_unit_buf_t *buf) 1954{ |
1573 nxt_queue_link_t *lnk; 1574 nxt_unit_mmap_buf_t *mmap_buf; 1575 nxt_unit_request_info_impl_t *req_impl; | 1955 nxt_unit_mmap_buf_t *mmap_buf; |
1576 1577 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); | 1956 1957 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); |
1578 req_impl = nxt_container_of(mmap_buf->req, nxt_unit_request_info_impl_t, 1579 req); | |
1580 | 1958 |
1581 lnk = &mmap_buf->link; 1582 1583 if (lnk == nxt_queue_last(&req_impl->incoming_buf) 1584 || lnk == nxt_queue_last(&req_impl->outgoing_buf)) 1585 { | 1959 if (mmap_buf->next == NULL) { |
1586 return NULL; 1587 } 1588 | 1960 return NULL; 1961 } 1962 |
1589 lnk = nxt_queue_next(lnk); 1590 mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); 1591 1592 return &mmap_buf->buf; | 1963 return &mmap_buf->next->buf; |
1593} 1594 1595 1596uint32_t 1597nxt_unit_buf_max(void) 1598{ 1599 return PORT_MMAP_DATA_SIZE; 1600} --- 8 unchanged lines hidden (view full) --- 1609 1610int 1611nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 1612 size_t size) 1613{ 1614 int rc; 1615 uint32_t part_size; 1616 const char *part_start; | 1964} 1965 1966 1967uint32_t 1968nxt_unit_buf_max(void) 1969{ 1970 return PORT_MMAP_DATA_SIZE; 1971} --- 8 unchanged lines hidden (view full) --- 1980 1981int 1982nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 1983 size_t size) 1984{ 1985 int rc; 1986 uint32_t part_size; 1987 const char *part_start; |
1617 nxt_unit_process_t *process; | |
1618 nxt_unit_mmap_buf_t mmap_buf; 1619 nxt_unit_request_info_impl_t *req_impl; 1620 1621 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1622 1623 part_start = start; 1624 1625 /* Check if response is not send yet. */ --- 10 unchanged lines hidden (view full) --- 1636 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1637 return rc; 1638 } 1639 1640 size -= part_size; 1641 part_start += part_size; 1642 } 1643 | 1988 nxt_unit_mmap_buf_t mmap_buf; 1989 nxt_unit_request_info_impl_t *req_impl; 1990 1991 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1992 1993 part_start = start; 1994 1995 /* Check if response is not send yet. */ --- 10 unchanged lines hidden (view full) --- 2006 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2007 return rc; 2008 } 2009 2010 size -= part_size; 2011 part_start += part_size; 2012 } 2013 |
1644 process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg); 1645 if (nxt_slow_path(process == NULL)) { 1646 return NXT_UNIT_ERROR; 1647 } 1648 | |
1649 while (size > 0) { 1650 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 1651 | 2014 while (size > 0) { 2015 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2016 |
1652 rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port, 1653 part_size, &mmap_buf); | 2017 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 2018 &req->response_port, part_size, 2019 &mmap_buf); |
1654 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1655 return rc; 1656 } 1657 1658 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 1659 part_start, part_size); 1660 | 2020 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2021 return rc; 2022 } 2023 2024 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2025 part_start, part_size); 2026 |
1661 rc = nxt_unit_mmap_buf_send(req->ctx, 1662 req_impl->recv_msg.port_msg.stream, 1663 &mmap_buf, 0); | 2027 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); |
1664 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1665 nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start, 1666 mmap_buf.buf.end - mmap_buf.buf.start); 1667 1668 return rc; 1669 } 1670 1671 size -= part_size; --- 89 unchanged lines hidden (view full) --- 1761 1762 return NXT_UNIT_OK; 1763} 1764 1765 1766ssize_t 1767nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 1768{ | 2028 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2029 nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start, 2030 mmap_buf.buf.end - mmap_buf.buf.start); 2031 2032 return rc; 2033 } 2034 2035 size -= part_size; --- 89 unchanged lines hidden (view full) --- 2125 2126 return NXT_UNIT_OK; 2127} 2128 2129 2130ssize_t 2131nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 2132{ |
2133 return nxt_unit_buf_read(&req->content_buf, &req->content_length, 2134 dst, size); 2135} 2136 2137 2138static ssize_t 2139nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) 2140{ |
|
1769 u_char *p; 1770 size_t rest, copy, read; 1771 nxt_unit_buf_t *buf; 1772 1773 p = dst; 1774 rest = size; 1775 | 2141 u_char *p; 2142 size_t rest, copy, read; 2143 nxt_unit_buf_t *buf; 2144 2145 p = dst; 2146 rest = size; 2147 |
1776 buf = req->content_buf; | 2148 buf = *b; |
1777 1778 while (buf != NULL) { 1779 copy = buf->end - buf->free; 1780 copy = nxt_min(rest, copy); 1781 1782 p = nxt_cpymem(p, buf->free, copy); 1783 1784 buf->free += copy; --- 5 unchanged lines hidden (view full) --- 1790 } 1791 1792 break; 1793 } 1794 1795 buf = nxt_unit_buf_next(buf); 1796 } 1797 | 2149 2150 while (buf != NULL) { 2151 copy = buf->end - buf->free; 2152 copy = nxt_min(rest, copy); 2153 2154 p = nxt_cpymem(p, buf->free, copy); 2155 2156 buf->free += copy; --- 5 unchanged lines hidden (view full) --- 2162 } 2163 2164 break; 2165 } 2166 2167 buf = nxt_unit_buf_next(buf); 2168 } 2169 |
1798 req->content_buf = buf; | 2170 *b = buf; |
1799 1800 read = size - rest; 1801 | 2171 2172 read = size - rest; 2173 |
1802 req->content_length -= read; | 2174 *len -= read; |
1803 1804 return read; 1805} 1806 1807 1808void 1809nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 1810{ --- 36 unchanged lines hidden (view full) --- 1847 1848 return; 1849 } 1850 1851skip_response_send: 1852 1853 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 1854 | 2175 2176 return read; 2177} 2178 2179 2180void 2181nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 2182{ --- 36 unchanged lines hidden (view full) --- 2219 2220 return; 2221 } 2222 2223skip_response_send: 2224 2225 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 2226 |
1855 msg.stream = req_impl->recv_msg.port_msg.stream; | 2227 msg.stream = req_impl->stream; |
1856 msg.pid = lib->pid; 1857 msg.reply_port = 0; 1858 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 1859 : _NXT_PORT_MSG_RPC_ERROR; 1860 msg.last = 1; 1861 msg.mmap = 0; 1862 msg.nf = 0; 1863 msg.mf = 0; --- 5 unchanged lines hidden (view full) --- 1869 nxt_unit_req_alert(req, "last message send failed: %s (%d)", 1870 strerror(errno), errno); 1871 } 1872 1873 nxt_unit_request_info_release(req); 1874} 1875 1876 | 2228 msg.pid = lib->pid; 2229 msg.reply_port = 0; 2230 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 2231 : _NXT_PORT_MSG_RPC_ERROR; 2232 msg.last = 1; 2233 msg.mmap = 0; 2234 msg.nf = 0; 2235 msg.mf = 0; --- 5 unchanged lines hidden (view full) --- 2241 nxt_unit_req_alert(req, "last message send failed: %s (%d)", 2242 strerror(errno), errno); 2243 } 2244 2245 nxt_unit_request_info_release(req); 2246} 2247 2248 |
2249int 2250nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 2251 uint8_t last, const void *start, size_t size) 2252{ 2253 const struct iovec iov = { (void *) start, size }; 2254 2255 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); 2256} 2257 2258 2259int 2260nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 2261 uint8_t last, const struct iovec *iov, int iovcnt) 2262{ 2263 int i, rc; 2264 size_t l, copy; 2265 uint32_t payload_len, buf_size; 2266 const uint8_t *b; 2267 nxt_unit_buf_t *buf; 2268 nxt_websocket_header_t *wh; 2269 2270 payload_len = 0; 2271 2272 for (i = 0; i < iovcnt; i++) { 2273 payload_len += iov[i].iov_len; 2274 } 2275 2276 buf_size = 10 + payload_len; 2277 2278 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, 2279 PORT_MMAP_DATA_SIZE)); 2280 if (nxt_slow_path(buf == NULL)) { 2281 nxt_unit_req_error(req, "Failed to allocate buf for content"); 2282 2283 return NXT_UNIT_ERROR; 2284 } 2285 2286 buf->start[0] = 0; 2287 buf->start[1] = 0; 2288 2289 wh = (void *) buf->free; 2290 2291 buf->free = nxt_websocket_frame_init(wh, payload_len); 2292 wh->fin = last; 2293 wh->opcode = opcode; 2294 2295 for (i = 0; i < iovcnt; i++) { 2296 b = iov[i].iov_base; 2297 l = iov[i].iov_len; 2298 2299 while (l > 0) { 2300 copy = buf->end - buf->free; 2301 copy = nxt_min(l, copy); 2302 2303 buf->free = nxt_cpymem(buf->free, b, copy); 2304 b += copy; 2305 l -= copy; 2306 2307 if (l > 0) { 2308 buf_size -= buf->end - buf->start; 2309 2310 rc = nxt_unit_buf_send(buf); 2311 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2312 nxt_unit_req_error(req, "Failed to send content"); 2313 2314 return NXT_UNIT_ERROR; 2315 } 2316 2317 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, 2318 PORT_MMAP_DATA_SIZE)); 2319 if (nxt_slow_path(buf == NULL)) { 2320 nxt_unit_req_error(req, 2321 "Failed to allocate buf for content"); 2322 2323 return NXT_UNIT_ERROR; 2324 } 2325 } 2326 } 2327 } 2328 2329 if (buf->free > buf->start) { 2330 rc = nxt_unit_buf_send(buf); 2331 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2332 nxt_unit_req_error(req, "Failed to send content"); 2333 } 2334 } 2335 2336 return rc; 2337} 2338 2339 2340ssize_t 2341nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 2342 size_t size) 2343{ 2344 ssize_t res; 2345 uint8_t *b; 2346 uint64_t i, d; 2347 2348 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, 2349 dst, size); 2350 2351 if (ws->mask == NULL) { 2352 return res; 2353 } 2354 2355 b = dst; 2356 d = (ws->payload_len - ws->content_length - res) % 4; 2357 2358 for (i = 0; i < (uint64_t) res; i++) { 2359 b[i] ^= ws->mask[ (i + d) % 4 ]; 2360 } 2361 2362 return res; 2363} 2364 2365 2366int 2367nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) 2368{ 2369 char *b; 2370 size_t size; 2371 nxt_unit_websocket_frame_impl_t *ws_impl; 2372 2373 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 2374 2375 if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) { 2376 return NXT_UNIT_OK; 2377 } 2378 2379 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; 2380 2381 b = malloc(size); 2382 if (nxt_slow_path(b == NULL)) { 2383 return NXT_UNIT_ERROR; 2384 } 2385 2386 memcpy(b, ws_impl->buf->buf.start, size); 2387 2388 ws_impl->buf->buf.start = b; 2389 ws_impl->buf->buf.free = b; 2390 ws_impl->buf->buf.end = b + size; 2391 2392 ws_impl->retain_buf = b; 2393 2394 return NXT_UNIT_OK; 2395} 2396 2397 2398void 2399nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) 2400{ 2401 nxt_unit_websocket_frame_release(ws); 2402} 2403 2404 |
|
1877static nxt_port_mmap_header_t * 1878nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 1879 nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n) 1880{ 1881 int res, nchunks, i; 1882 nxt_unit_mmap_t *mm, *mm_end; 1883 nxt_port_mmap_header_t *hdr; 1884 --- 465 unchanged lines hidden (view full) --- 2350 int rc; 2351 nxt_chunk_id_t c; 2352 nxt_unit_process_t *process; 2353 nxt_port_mmap_header_t *hdr; 2354 nxt_port_mmap_tracking_msg_t *tracking_msg; 2355 2356 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 2357 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", | 2405static nxt_port_mmap_header_t * 2406nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2407 nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n) 2408{ 2409 int res, nchunks, i; 2410 nxt_unit_mmap_t *mm, *mm_end; 2411 nxt_port_mmap_header_t *hdr; 2412 --- 465 unchanged lines hidden (view full) --- 2878 int rc; 2879 nxt_chunk_id_t c; 2880 nxt_unit_process_t *process; 2881 nxt_port_mmap_header_t *hdr; 2882 nxt_port_mmap_tracking_msg_t *tracking_msg; 2883 2884 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 2885 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", |
2358 recv_msg->port_msg.stream, (int) recv_msg->size); | 2886 recv_msg->stream, (int) recv_msg->size); |
2359 2360 return 0; 2361 } 2362 2363 tracking_msg = recv_msg->start; 2364 2365 recv_msg->start = tracking_msg + 1; 2366 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); --- 6 unchanged lines hidden (view full) --- 2373 pthread_mutex_lock(&process->incoming.mutex); 2374 2375 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); 2376 if (nxt_slow_path(hdr == NULL)) { 2377 pthread_mutex_unlock(&process->incoming.mutex); 2378 2379 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " 2380 "invalid mmap id %d,%"PRIu32, | 2887 2888 return 0; 2889 } 2890 2891 tracking_msg = recv_msg->start; 2892 2893 recv_msg->start = tracking_msg + 1; 2894 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); --- 6 unchanged lines hidden (view full) --- 2901 pthread_mutex_lock(&process->incoming.mutex); 2902 2903 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); 2904 if (nxt_slow_path(hdr == NULL)) { 2905 pthread_mutex_unlock(&process->incoming.mutex); 2906 2907 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " 2908 "invalid mmap id %d,%"PRIu32, |
2381 recv_msg->port_msg.stream, 2382 (int) process->pid, tracking_msg->mmap_id); | 2909 recv_msg->stream, (int) process->pid, 2910 tracking_msg->mmap_id); |
2383 2384 return 0; 2385 } 2386 2387 c = tracking_msg->tracking_id; | 2911 2912 return 0; 2913 } 2914 2915 c = tracking_msg->tracking_id; |
2388 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->port_msg.stream, 0); | 2916 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); |
2389 2390 if (rc == 0) { 2391 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", | 2917 2918 if (rc == 0) { 2919 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", |
2392 recv_msg->port_msg.stream); | 2920 recv_msg->stream); |
2393 2394 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 2395 } 2396 2397 pthread_mutex_unlock(&process->incoming.mutex); 2398 2399 return rc; 2400} 2401 2402 2403static int | 2921 2922 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 2923 } 2924 2925 pthread_mutex_unlock(&process->incoming.mutex); 2926 2927 return rc; 2928} 2929 2930 2931static int |
2404nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 2405 nxt_queue_t *incoming_buf) | 2932nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) |
2406{ 2407 void *start; 2408 uint32_t size; 2409 nxt_unit_process_t *process; | 2933{ 2934 void *start; 2935 uint32_t size; 2936 nxt_unit_process_t *process; |
2410 nxt_unit_mmap_buf_t *b; | 2937 nxt_unit_mmap_buf_t *b, **incoming_tail; |
2411 nxt_port_mmap_msg_t *mmap_msg, *end; 2412 nxt_port_mmap_header_t *hdr; 2413 2414 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 2415 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", | 2938 nxt_port_mmap_msg_t *mmap_msg, *end; 2939 nxt_port_mmap_header_t *hdr; 2940 2941 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 2942 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", |
2416 recv_msg->port_msg.stream, (int) recv_msg->size); | 2943 recv_msg->stream, (int) recv_msg->size); |
2417 2418 return NXT_UNIT_ERROR; 2419 } 2420 2421 process = nxt_unit_msg_get_process(ctx, recv_msg); 2422 if (nxt_slow_path(process == NULL)) { 2423 return NXT_UNIT_ERROR; 2424 } 2425 2426 mmap_msg = recv_msg->start; 2427 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 2428 | 2944 2945 return NXT_UNIT_ERROR; 2946 } 2947 2948 process = nxt_unit_msg_get_process(ctx, recv_msg); 2949 if (nxt_slow_path(process == NULL)) { 2950 return NXT_UNIT_ERROR; 2951 } 2952 2953 mmap_msg = recv_msg->start; 2954 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 2955 |
2956 incoming_tail = &recv_msg->incoming_buf; 2957 |
|
2429 pthread_mutex_lock(&process->incoming.mutex); 2430 2431 for (; mmap_msg < end; mmap_msg++) { 2432 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); 2433 if (nxt_slow_path(hdr == NULL)) { 2434 pthread_mutex_unlock(&process->incoming.mutex); 2435 2436 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 2437 "invalid mmap id %d,%"PRIu32, | 2958 pthread_mutex_lock(&process->incoming.mutex); 2959 2960 for (; mmap_msg < end; mmap_msg++) { 2961 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); 2962 if (nxt_slow_path(hdr == NULL)) { 2963 pthread_mutex_unlock(&process->incoming.mutex); 2964 2965 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 2966 "invalid mmap id %d,%"PRIu32, |
2438 recv_msg->port_msg.stream, 2439 (int) process->pid, mmap_msg->mmap_id); | 2967 recv_msg->stream, (int) process->pid, 2968 mmap_msg->mmap_id); |
2440 2441 return NXT_UNIT_ERROR; 2442 } 2443 2444 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 2445 size = mmap_msg->size; 2446 2447 if (recv_msg->start == mmap_msg) { 2448 recv_msg->start = start; 2449 recv_msg->size = size; 2450 } 2451 2452 b = nxt_unit_mmap_buf_get(ctx); 2453 if (nxt_slow_path(b == NULL)) { 2454 pthread_mutex_unlock(&process->incoming.mutex); 2455 | 2969 2970 return NXT_UNIT_ERROR; 2971 } 2972 2973 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 2974 size = mmap_msg->size; 2975 2976 if (recv_msg->start == mmap_msg) { 2977 recv_msg->start = start; 2978 recv_msg->size = size; 2979 } 2980 2981 b = nxt_unit_mmap_buf_get(ctx); 2982 if (nxt_slow_path(b == NULL)) { 2983 pthread_mutex_unlock(&process->incoming.mutex); 2984 |
2456 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 2457 "failed to allocate buf", 2458 recv_msg->port_msg.stream); | 2985 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 2986 recv_msg->stream); |
2459 2460 nxt_unit_mmap_release(hdr, start, size); 2461 2462 return NXT_UNIT_ERROR; 2463 } 2464 | 2987 2988 nxt_unit_mmap_release(hdr, start, size); 2989 2990 return NXT_UNIT_ERROR; 2991 } 2992 |
2465 nxt_queue_insert_tail(incoming_buf, &b->link); | 2993 nxt_unit_mmap_buf_insert(incoming_tail, b); 2994 incoming_tail = &b->next; |
2466 2467 b->buf.start = start; 2468 b->buf.free = start; 2469 b->buf.end = b->buf.start + size; 2470 b->hdr = hdr; 2471 2472 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", | 2995 2996 b->buf.start = start; 2997 b->buf.free = start; 2998 b->buf.end = b->buf.start + size; 2999 b->hdr = hdr; 3000 3001 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", |
2473 recv_msg->port_msg.stream, | 3002 recv_msg->stream, |
2474 start, (int) size, 2475 (int) hdr->src_pid, (int) hdr->dst_pid, 2476 (int) hdr->id, (int) mmap_msg->chunk_id, 2477 (int) mmap_msg->size); 2478 } 2479 2480 pthread_mutex_unlock(&process->incoming.mutex); 2481 --- 198 unchanged lines hidden (view full) --- 2680 rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, 2681 buf, sizeof(buf), 2682 oob, sizeof(oob)); 2683 } 2684 2685 if (nxt_fast_path(rsize > 0)) { 2686 rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize, 2687 oob, sizeof(oob)); | 3003 start, (int) size, 3004 (int) hdr->src_pid, (int) hdr->dst_pid, 3005 (int) hdr->id, (int) mmap_msg->chunk_id, 3006 (int) mmap_msg->size); 3007 } 3008 3009 pthread_mutex_unlock(&process->incoming.mutex); 3010 --- 198 unchanged lines hidden (view full) --- 3209 rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, 3210 buf, sizeof(buf), 3211 oob, sizeof(oob)); 3212 } 3213 3214 if (nxt_fast_path(rsize > 0)) { 3215 rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize, 3216 oob, sizeof(oob)); |
3217 3218#if (NXT_DEBUG) 3219 memset(buf, 0xAC, rsize); 3220#endif 3221 |
|
2688 } else { 2689 rc = NXT_UNIT_ERROR; 2690 } 2691 2692 return rc; 2693} 2694 2695 --- 74 unchanged lines hidden (view full) --- 2770 2771 return &new_ctx->ctx; 2772} 2773 2774 2775void 2776nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) 2777{ | 3222 } else { 3223 rc = NXT_UNIT_ERROR; 3224 } 3225 3226 return rc; 3227} 3228 3229 --- 74 unchanged lines hidden (view full) --- 3304 3305 return &new_ctx->ctx; 3306} 3307 3308 3309void 3310nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) 3311{ |
2778 nxt_unit_impl_t *lib; 2779 nxt_unit_ctx_impl_t *ctx_impl; 2780 nxt_unit_mmap_buf_t *mmap_buf; 2781 nxt_unit_request_info_impl_t *req_impl; | 3312 nxt_unit_impl_t *lib; 3313 nxt_unit_ctx_impl_t *ctx_impl; 3314 nxt_unit_mmap_buf_t *mmap_buf; 3315 nxt_unit_request_info_impl_t *req_impl; 3316 nxt_unit_websocket_frame_impl_t *ws_impl; |
2782 2783 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2784 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2785 2786 nxt_queue_each(req_impl, &ctx_impl->active_req, 2787 nxt_unit_request_info_impl_t, link) 2788 { 2789 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 2790 2791 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 2792 2793 } nxt_queue_loop; 2794 | 3317 3318 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3319 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3320 3321 nxt_queue_each(req_impl, &ctx_impl->active_req, 3322 nxt_unit_request_info_impl_t, link) 3323 { 3324 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 3325 3326 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 3327 3328 } nxt_queue_loop; 3329 |
2795 nxt_queue_remove(&ctx_impl->ctx_buf[0].link); 2796 nxt_queue_remove(&ctx_impl->ctx_buf[1].link); | 3330 nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]); 3331 nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]); |
2797 | 3332 |
2798 nxt_queue_each(mmap_buf, &ctx_impl->free_buf, nxt_unit_mmap_buf_t, link) { 2799 2800 nxt_queue_remove(&mmap_buf->link); | 3333 while (ctx_impl->free_buf != NULL) { 3334 mmap_buf = ctx_impl->free_buf; 3335 nxt_unit_mmap_buf_remove(mmap_buf); |
2801 free(mmap_buf); | 3336 free(mmap_buf); |
3337 } |
|
2802 | 3338 |
2803 } nxt_queue_loop; 2804 | |
2805 nxt_queue_each(req_impl, &ctx_impl->free_req, 2806 nxt_unit_request_info_impl_t, link) 2807 { 2808 nxt_unit_request_info_free(req_impl); 2809 2810 } nxt_queue_loop; 2811 | 3339 nxt_queue_each(req_impl, &ctx_impl->free_req, 3340 nxt_unit_request_info_impl_t, link) 3341 { 3342 nxt_unit_request_info_free(req_impl); 3343 3344 } nxt_queue_loop; 3345 |
3346 nxt_queue_each(ws_impl, &ctx_impl->free_ws, 3347 nxt_unit_websocket_frame_impl_t, link) 3348 { 3349 nxt_unit_websocket_frame_free(ws_impl); 3350 3351 } nxt_queue_loop; 3352 |
|
2812 nxt_queue_remove(&ctx_impl->link); 2813 2814 if (ctx_impl != &lib->main_ctx) { 2815 free(ctx_impl); 2816 } 2817} 2818 2819 --- 629 unchanged lines hidden (view full) --- 3449 return lhq.value; 3450 3451 default: 3452 return NULL; 3453 } 3454} 3455 3456 | 3353 nxt_queue_remove(&ctx_impl->link); 3354 3355 if (ctx_impl != &lib->main_ctx) { 3356 free(ctx_impl); 3357 } 3358} 3359 3360 --- 629 unchanged lines hidden (view full) --- 3990 return lhq.value; 3991 3992 default: 3993 return NULL; 3994 } 3995} 3996 3997 |
3998static nxt_int_t 3999nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 4000{ 4001 return NXT_OK; 4002} 4003 4004 4005static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { 4006 NXT_LVLHSH_DEFAULT, 4007 nxt_unit_request_hash_test, 4008 nxt_lvlhsh_alloc, 4009 nxt_lvlhsh_free, 4010}; 4011 4012 4013static int 4014nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 4015 nxt_unit_request_info_impl_t *req_impl) 4016{ 4017 uint32_t *stream; 4018 nxt_int_t res; 4019 nxt_lvlhsh_query_t lhq; 4020 4021 stream = &req_impl->stream; 4022 4023 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 4024 lhq.key.length = sizeof(*stream); 4025 lhq.key.start = (u_char *) stream; 4026 lhq.proto = &lvlhsh_requests_proto; 4027 lhq.pool = NULL; 4028 lhq.replace = 0; 4029 lhq.value = req_impl; 4030 4031 res = nxt_lvlhsh_insert(request_hash, &lhq); 4032 4033 switch (res) { 4034 4035 case NXT_OK: 4036 return NXT_UNIT_OK; 4037 4038 default: 4039 return NXT_UNIT_ERROR; 4040 } 4041} 4042 4043 4044static nxt_unit_request_info_impl_t * 4045nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, 4046 int remove) 4047{ 4048 nxt_int_t res; 4049 nxt_lvlhsh_query_t lhq; 4050 4051 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); 4052 lhq.key.length = sizeof(stream); 4053 lhq.key.start = (u_char *) &stream; 4054 lhq.proto = &lvlhsh_requests_proto; 4055 lhq.pool = NULL; 4056 4057 if (remove) { 4058 res = nxt_lvlhsh_delete(request_hash, &lhq); 4059 4060 } else { 4061 res = nxt_lvlhsh_find(request_hash, &lhq); 4062 } 4063 4064 switch (res) { 4065 4066 case NXT_OK: 4067 return lhq.value; 4068 4069 default: 4070 return NULL; 4071 } 4072} 4073 4074 |
|
3457void 3458nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 3459{ 3460 int log_fd, n; 3461 char msg[NXT_MAX_ERROR_STR], *p, *end; 3462 pid_t pid; 3463 va_list ap; 3464 nxt_unit_impl_t *lib; --- 56 unchanged lines hidden (view full) --- 3521 p = msg; 3522 end = p + sizeof(msg) - 1; 3523 3524 p = nxt_unit_snprint_prefix(p, end, pid, level); 3525 3526 if (nxt_fast_path(req != NULL)) { 3527 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 3528 | 4075void 4076nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 4077{ 4078 int log_fd, n; 4079 char msg[NXT_MAX_ERROR_STR], *p, *end; 4080 pid_t pid; 4081 va_list ap; 4082 nxt_unit_impl_t *lib; --- 56 unchanged lines hidden (view full) --- 4139 p = msg; 4140 end = p + sizeof(msg) - 1; 4141 4142 p = nxt_unit_snprint_prefix(p, end, pid, level); 4143 4144 if (nxt_fast_path(req != NULL)) { 4145 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 4146 |
3529 p += snprintf(p, end - p, 3530 "#%"PRIu32": ", req_impl->recv_msg.port_msg.stream); | 4147 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); |
3531 } 3532 3533 va_start(ap, fmt); 3534 p += vsnprintf(p, end - p, fmt, ap); 3535 va_end(ap); 3536 3537 if (nxt_slow_path(p > end)) { 3538 memcpy(end - 5, "[...]", 5); --- 77 unchanged lines hidden --- | 4148 } 4149 4150 va_start(ap, fmt); 4151 p += vsnprintf(p, end - p, fmt, ap); 4152 va_end(ap); 4153 4154 if (nxt_slow_path(p > end)) { 4155 memcpy(end - 5, "[...]", 5); --- 77 unchanged lines hidden --- |