Deleted
Added
nxt_unit.c (1545:78836321a126) | nxt_unit.c (1546:06017e6e3a5f) |
---|---|
1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include <stdlib.h> 7 8#include "nxt_main.h" --- 37 unchanged lines hidden (view full) --- 46 nxt_unit_mmap_buf_t *mmap_buf); 47nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 48 nxt_unit_mmap_buf_t *mmap_buf); 49nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 50static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 51 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, 52 int *log_fd, uint32_t *stream, uint32_t *shm_limit); 53static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); | 1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include <stdlib.h> 7 8#include "nxt_main.h" --- 37 unchanged lines hidden (view full) --- 46 nxt_unit_mmap_buf_t *mmap_buf); 47nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 48 nxt_unit_mmap_buf_t *mmap_buf); 49nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 50static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 51 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, 52 int *log_fd, uint32_t *stream, uint32_t *shm_limit); 53static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); |
54static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); |
|
54static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 55 nxt_unit_recv_msg_t *recv_msg); 56static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 57 nxt_unit_recv_msg_t *recv_msg); 58static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 59 nxt_unit_port_id_t *port_id); 60static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 61 nxt_unit_recv_msg_t *recv_msg); --- 36 unchanged lines hidden (view full) --- 98 nxt_unit_port_t *port, uint32_t size, 99 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 100static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 101 102static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 103nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); 104nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); 105static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); | 55static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 56 nxt_unit_recv_msg_t *recv_msg); 57static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 58 nxt_unit_recv_msg_t *recv_msg); 59static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 60 nxt_unit_port_id_t *port_id); 61static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 62 nxt_unit_recv_msg_t *recv_msg); --- 36 unchanged lines hidden (view full) --- 99 nxt_unit_port_t *port, uint32_t size, 100 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 101static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 102 103static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 104nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); 105nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); 106static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); |
106static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, 107 nxt_unit_process_t *process, uint32_t id); | |
108static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, | 107static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, |
109 nxt_unit_recv_msg_t *recv_msg); | 108 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); 109static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, 110 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, 111 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); |
110static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, | 112static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, |
111 nxt_unit_recv_msg_t *recv_msg); | 113 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); 114static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); |
112static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 113 nxt_unit_process_t *process, 114 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 115static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 116 117static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, 118 pid_t pid); 119static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, --- 115 unchanged lines hidden (view full) --- 235 236 nxt_queue_link_t link; 237 238 nxt_unit_ctx_impl_t *ctx_impl; 239}; 240 241 242struct nxt_unit_read_buf_s { | 115static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 116 nxt_unit_process_t *process, 117 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 118static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 119 120static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, 121 pid_t pid); 122static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, --- 115 unchanged lines hidden (view full) --- 238 239 nxt_queue_link_t link; 240 241 nxt_unit_ctx_impl_t *ctx_impl; 242}; 243 244 245struct nxt_unit_read_buf_s { |
243 nxt_unit_read_buf_t *next; | 246 nxt_queue_link_t link; 247 nxt_unit_ctx_impl_t *ctx_impl; |
244 ssize_t size; 245 char buf[16384]; 246 char oob[256]; 247}; 248 249 250struct nxt_unit_ctx_impl_s { 251 nxt_unit_ctx_t ctx; --- 19 unchanged lines hidden (view full) --- 271 nxt_queue_t active_req; 272 273 /* of nxt_unit_request_info_impl_t */ 274 nxt_lvlhsh_t requests; 275 276 /* of nxt_unit_request_info_impl_t */ 277 nxt_queue_t ready_req; 278 | 248 ssize_t size; 249 char buf[16384]; 250 char oob[256]; 251}; 252 253 254struct nxt_unit_ctx_impl_s { 255 nxt_unit_ctx_t ctx; --- 19 unchanged lines hidden (view full) --- 275 nxt_queue_t active_req; 276 277 /* of nxt_unit_request_info_impl_t */ 278 nxt_lvlhsh_t requests; 279 280 /* of nxt_unit_request_info_impl_t */ 281 nxt_queue_t ready_req; 282 |
279 nxt_unit_read_buf_t *pending_read_head; 280 nxt_unit_read_buf_t **pending_read_tail; 281 nxt_unit_read_buf_t *free_read_buf; | 283 /* of nxt_unit_read_buf_t */ 284 nxt_queue_t pending_rbuf; |
282 | 285 |
286 /* of nxt_unit_read_buf_t */ 287 nxt_queue_t free_rbuf; 288 |
|
283 nxt_unit_mmap_buf_t ctx_buf[2]; 284 nxt_unit_read_buf_t ctx_read_buf; 285 286 nxt_unit_request_info_impl_t req; 287}; 288 289 290struct nxt_unit_impl_s { --- 22 unchanged lines hidden (view full) --- 313}; 314 315 316struct nxt_unit_port_impl_s { 317 nxt_unit_port_t port; 318 319 nxt_atomic_t use_count; 320 | 289 nxt_unit_mmap_buf_t ctx_buf[2]; 290 nxt_unit_read_buf_t ctx_read_buf; 291 292 nxt_unit_request_info_impl_t req; 293}; 294 295 296struct nxt_unit_impl_s { --- 22 unchanged lines hidden (view full) --- 319}; 320 321 322struct nxt_unit_port_impl_s { 323 nxt_unit_port_t port; 324 325 nxt_atomic_t use_count; 326 |
327 /* for nxt_unit_process_t.ports */ |
|
321 nxt_queue_link_t link; 322 nxt_unit_process_t *process; 323 324 /* of nxt_unit_request_info_impl_t */ 325 nxt_queue_t awaiting_req; 326 327 int ready; 328}; 329 330 331struct nxt_unit_mmap_s { 332 nxt_port_mmap_header_t *hdr; | 328 nxt_queue_link_t link; 329 nxt_unit_process_t *process; 330 331 /* of nxt_unit_request_info_impl_t */ 332 nxt_queue_t awaiting_req; 333 334 int ready; 335}; 336 337 338struct nxt_unit_mmap_s { 339 nxt_port_mmap_header_t *hdr; |
340 341 /* of nxt_unit_read_buf_t */ 342 nxt_queue_t awaiting_rbuf; |
|
333}; 334 335 336struct nxt_unit_mmaps_s { 337 pthread_mutex_t mutex; 338 uint32_t size; 339 uint32_t cap; 340 nxt_atomic_t allocated_chunks; 341 nxt_unit_mmap_t *elts; 342}; 343 344 345struct nxt_unit_process_s { 346 pid_t pid; 347 | 343}; 344 345 346struct nxt_unit_mmaps_s { 347 pthread_mutex_t mutex; 348 uint32_t size; 349 uint32_t cap; 350 nxt_atomic_t allocated_chunks; 351 nxt_unit_mmap_t *elts; 352}; 353 354 355struct nxt_unit_process_s { 356 pid_t pid; 357 |
348 nxt_queue_t ports; | 358 nxt_queue_t ports; /* of nxt_unit_port_impl_t */ |
349 350 nxt_unit_mmaps_t incoming; 351 nxt_unit_mmaps_t outgoing; 352 353 nxt_unit_impl_t *lib; 354 355 nxt_atomic_t use_count; 356 --- 175 unchanged lines hidden (view full) --- 532 533 ctx_impl->use_count = 1; 534 ctx_impl->wait_items = 0; 535 536 nxt_queue_init(&ctx_impl->free_req); 537 nxt_queue_init(&ctx_impl->free_ws); 538 nxt_queue_init(&ctx_impl->active_req); 539 nxt_queue_init(&ctx_impl->ready_req); | 359 360 nxt_unit_mmaps_t incoming; 361 nxt_unit_mmaps_t outgoing; 362 363 nxt_unit_impl_t *lib; 364 365 nxt_atomic_t use_count; 366 --- 175 unchanged lines hidden (view full) --- 542 543 ctx_impl->use_count = 1; 544 ctx_impl->wait_items = 0; 545 546 nxt_queue_init(&ctx_impl->free_req); 547 nxt_queue_init(&ctx_impl->free_ws); 548 nxt_queue_init(&ctx_impl->active_req); 549 nxt_queue_init(&ctx_impl->ready_req); |
550 nxt_queue_init(&ctx_impl->pending_rbuf); 551 nxt_queue_init(&ctx_impl->free_rbuf); |
|
540 541 ctx_impl->free_buf = NULL; 542 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 543 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 544 545 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); | 552 553 ctx_impl->free_buf = NULL; 554 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 555 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 556 557 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); |
558 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); |
|
546 | 559 |
547 ctx_impl->pending_read_head = NULL; 548 ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; 549 ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; 550 ctx_impl->ctx_read_buf.next = NULL; | 560 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; |
551 552 ctx_impl->req.req.ctx = &ctx_impl->ctx; 553 ctx_impl->req.req.unit = &lib->unit; 554 555 ctx_impl->read_port = NULL; 556 ctx_impl->requests.slot = 0; 557 558 return NXT_UNIT_OK; --- 203 unchanged lines hidden (view full) --- 762 if (res != sizeof(msg)) { 763 return NXT_UNIT_ERROR; 764 } 765 766 return NXT_UNIT_OK; 767} 768 769 | 561 562 ctx_impl->req.req.ctx = &ctx_impl->ctx; 563 ctx_impl->req.req.unit = &lib->unit; 564 565 ctx_impl->read_port = NULL; 566 ctx_impl->requests.slot = 0; 567 568 return NXT_UNIT_OK; --- 203 unchanged lines hidden (view full) --- 772 if (res != sizeof(msg)) { 773 return NXT_UNIT_ERROR; 774 } 775 776 return NXT_UNIT_OK; 777} 778 779 |
770int 771nxt_unit_process_msg(nxt_unit_ctx_t *ctx, 772 void *buf, size_t buf_size, void *oob, size_t oob_size) | 780static int 781nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) |
773{ 774 int rc; 775 pid_t pid; 776 struct cmsghdr *cm; 777 nxt_port_msg_t *port_msg; 778 nxt_unit_impl_t *lib; 779 nxt_unit_recv_msg_t recv_msg; 780 781 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 782 783 rc = NXT_UNIT_ERROR; 784 recv_msg.fd = -1; 785 recv_msg.process = NULL; | 782{ 783 int rc; 784 pid_t pid; 785 struct cmsghdr *cm; 786 nxt_port_msg_t *port_msg; 787 nxt_unit_impl_t *lib; 788 nxt_unit_recv_msg_t recv_msg; 789 790 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 791 792 rc = NXT_UNIT_ERROR; 793 recv_msg.fd = -1; 794 recv_msg.process = NULL; |
786 port_msg = buf; 787 cm = oob; | 795 port_msg = (nxt_port_msg_t *) rbuf->buf; 796 cm = (struct cmsghdr *) rbuf->oob; |
788 | 797 |
789 if (oob_size >= CMSG_SPACE(sizeof(int)) 790 && cm->cmsg_len == CMSG_LEN(sizeof(int)) | 798 if (cm->cmsg_len == CMSG_LEN(sizeof(int)) |
791 && cm->cmsg_level == SOL_SOCKET 792 && cm->cmsg_type == SCM_RIGHTS) 793 { 794 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 795 } 796 797 recv_msg.incoming_buf = NULL; 798 | 799 && cm->cmsg_level == SOL_SOCKET 800 && cm->cmsg_type == SCM_RIGHTS) 801 { 802 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 803 } 804 805 recv_msg.incoming_buf = NULL; 806 |
799 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { 800 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); | 807 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 808 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); |
801 goto fail; 802 } 803 804 recv_msg.stream = port_msg->stream; 805 recv_msg.pid = port_msg->pid; 806 recv_msg.reply_port = port_msg->reply_port; 807 recv_msg.last = port_msg->last; 808 recv_msg.mmap = port_msg->mmap; 809 810 recv_msg.start = port_msg + 1; | 809 goto fail; 810 } 811 812 recv_msg.stream = port_msg->stream; 813 recv_msg.pid = port_msg->pid; 814 recv_msg.reply_port = port_msg->reply_port; 815 recv_msg.last = port_msg->last; 816 recv_msg.mmap = port_msg->mmap; 817 818 recv_msg.start = port_msg + 1; |
811 recv_msg.size = buf_size - sizeof(nxt_port_msg_t); | 819 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); |
812 813 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 814 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", 815 port_msg->stream, (int) port_msg->type); 816 goto fail; 817 } 818 | 820 821 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 822 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", 823 port_msg->stream, (int) port_msg->type); 824 goto fail; 825 } 826 |
819 if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { 820 rc = NXT_UNIT_OK; | 827 if (port_msg->tracking) { 828 rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf); |
821 | 829 |
822 goto fail; | 830 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 831 if (rc == NXT_UNIT_AGAIN) { 832 recv_msg.fd = -1; 833 } 834 835 goto fail; 836 } |
823 } 824 825 /* Fragmentation is unsupported. */ 826 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 827 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", 828 port_msg->stream, (int) port_msg->type); 829 goto fail; 830 } 831 832 if (port_msg->mmap) { | 837 } 838 839 /* Fragmentation is unsupported. */ 840 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 841 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", 842 port_msg->stream, (int) port_msg->type); 843 goto fail; 844 } 845 846 if (port_msg->mmap) { |
833 if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { | 847 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); 848 849 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 850 if (rc == NXT_UNIT_AGAIN) { 851 recv_msg.fd = -1; 852 } 853 |
834 goto fail; 835 } 836 } 837 838 switch (port_msg->type) { 839 840 case _NXT_PORT_MSG_QUIT: 841 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); --- 230 unchanged lines hidden (view full) --- 1072 (char *) nxt_unit_sptr_get(&r->method), 1073 (int) r->target_length, 1074 (char *) nxt_unit_sptr_get(&r->target), 1075 (int) r->content_length); 1076 1077 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); 1078 1079 res = nxt_unit_request_check_response_port(req, &port_id); | 854 goto fail; 855 } 856 } 857 858 switch (port_msg->type) { 859 860 case _NXT_PORT_MSG_QUIT: 861 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); --- 230 unchanged lines hidden (view full) --- 1092 (char *) nxt_unit_sptr_get(&r->method), 1093 (int) r->target_length, 1094 (char *) nxt_unit_sptr_get(&r->target), 1095 (int) r->content_length); 1096 1097 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); 1098 1099 res = nxt_unit_request_check_response_port(req, &port_id); |
1100 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1101 return NXT_UNIT_ERROR; 1102 } |
|
1080 1081 if (nxt_fast_path(res == NXT_UNIT_OK)) { 1082 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1083 1084 lib->callbacks.request_handler(req); 1085 } 1086 1087 return NXT_UNIT_OK; --- 1283 unchanged lines hidden (view full) --- 2371 } 2372} 2373 2374 2375static nxt_unit_read_buf_t * 2376nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2377{ 2378 nxt_unit_ctx_impl_t *ctx_impl; | 1103 1104 if (nxt_fast_path(res == NXT_UNIT_OK)) { 1105 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1106 1107 lib->callbacks.request_handler(req); 1108 } 1109 1110 return NXT_UNIT_OK; --- 1283 unchanged lines hidden (view full) --- 2394 } 2395} 2396 2397 2398static nxt_unit_read_buf_t * 2399nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2400{ 2401 nxt_unit_ctx_impl_t *ctx_impl; |
2402 nxt_unit_read_buf_t *rbuf; |
|
2379 2380 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2381 2382 pthread_mutex_lock(&ctx_impl->mutex); 2383 | 2403 2404 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2405 2406 pthread_mutex_lock(&ctx_impl->mutex); 2407 |
2384 return nxt_unit_read_buf_get_impl(ctx_impl); | 2408 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); 2409 2410 pthread_mutex_unlock(&ctx_impl->mutex); 2411 2412 return rbuf; |
2385} 2386 2387 2388static nxt_unit_read_buf_t * 2389nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2390{ | 2413} 2414 2415 2416static nxt_unit_read_buf_t * 2417nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2418{ |
2419 nxt_queue_link_t *link; |
|
2391 nxt_unit_read_buf_t *rbuf; 2392 | 2420 nxt_unit_read_buf_t *rbuf; 2421 |
2393 if (ctx_impl->free_read_buf != NULL) { 2394 rbuf = ctx_impl->free_read_buf; 2395 ctx_impl->free_read_buf = rbuf->next; | 2422 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { 2423 link = nxt_queue_first(&ctx_impl->free_rbuf); 2424 nxt_queue_remove(link); |
2396 | 2425 |
2397 pthread_mutex_unlock(&ctx_impl->mutex); | 2426 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); |
2398 2399 return rbuf; 2400 } 2401 | 2427 2428 return rbuf; 2429 } 2430 |
2402 pthread_mutex_unlock(&ctx_impl->mutex); 2403 | |
2404 rbuf = malloc(sizeof(nxt_unit_read_buf_t)); 2405 | 2431 rbuf = malloc(sizeof(nxt_unit_read_buf_t)); 2432 |
2433 if (nxt_fast_path(rbuf != NULL)) { 2434 rbuf->ctx_impl = ctx_impl; 2435 } 2436 |
|
2406 return rbuf; 2407} 2408 2409 2410static void 2411nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2412 nxt_unit_read_buf_t *rbuf) 2413{ 2414 nxt_unit_ctx_impl_t *ctx_impl; 2415 2416 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2417 2418 pthread_mutex_lock(&ctx_impl->mutex); 2419 | 2437 return rbuf; 2438} 2439 2440 2441static void 2442nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2443 nxt_unit_read_buf_t *rbuf) 2444{ 2445 nxt_unit_ctx_impl_t *ctx_impl; 2446 2447 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2448 2449 pthread_mutex_lock(&ctx_impl->mutex); 2450 |
2420 rbuf->next = ctx_impl->free_read_buf; 2421 ctx_impl->free_read_buf = rbuf; | 2451 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); |
2422 2423 pthread_mutex_unlock(&ctx_impl->mutex); 2424} 2425 2426 2427nxt_unit_buf_t * 2428nxt_unit_buf_next(nxt_unit_buf_t *buf) 2429{ --- 820 unchanged lines hidden (view full) --- 3250 if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { 3251 nxt_unit_read_buf_release(ctx, rbuf); 3252 3253 break; 3254 } 3255 3256 pthread_mutex_lock(&ctx_impl->mutex); 3257 | 2452 2453 pthread_mutex_unlock(&ctx_impl->mutex); 2454} 2455 2456 2457nxt_unit_buf_t * 2458nxt_unit_buf_next(nxt_unit_buf_t *buf) 2459{ --- 820 unchanged lines hidden (view full) --- 3280 if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { 3281 nxt_unit_read_buf_release(ctx, rbuf); 3282 3283 break; 3284 } 3285 3286 pthread_mutex_lock(&ctx_impl->mutex); 3287 |
3258 *ctx_impl->pending_read_tail = rbuf; 3259 ctx_impl->pending_read_tail = &rbuf->next; 3260 rbuf->next = NULL; | 3288 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link); |
3261 3262 pthread_mutex_unlock(&ctx_impl->mutex); 3263 3264 if (port_msg->type == _NXT_PORT_MSG_QUIT) { 3265 nxt_unit_debug(ctx, "oosm: quit received"); 3266 3267 return NXT_UNIT_ERROR; 3268 } 3269 } 3270 3271 return NXT_UNIT_OK; 3272} 3273 3274 3275static nxt_unit_mmap_t * 3276nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 3277{ | 3289 3290 pthread_mutex_unlock(&ctx_impl->mutex); 3291 3292 if (port_msg->type == _NXT_PORT_MSG_QUIT) { 3293 nxt_unit_debug(ctx, "oosm: quit received"); 3294 3295 return NXT_UNIT_ERROR; 3296 } 3297 } 3298 3299 return NXT_UNIT_OK; 3300} 3301 3302 3303static nxt_unit_mmap_t * 3304nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 3305{ |
3278 uint32_t cap; | 3306 uint32_t cap, n; 3307 nxt_unit_mmap_t *e; |
3279 | 3308 |
3309 if (nxt_fast_path(mmaps->size > i)) { 3310 return mmaps->elts + i; 3311 } 3312 |
|
3280 cap = mmaps->cap; 3281 3282 if (cap == 0) { 3283 cap = i + 1; 3284 } 3285 3286 while (i + 1 > cap) { 3287 3288 if (cap < 16) { 3289 cap = cap * 2; 3290 3291 } else { 3292 cap = cap + cap / 2; 3293 } 3294 } 3295 3296 if (cap != mmaps->cap) { 3297 | 3313 cap = mmaps->cap; 3314 3315 if (cap == 0) { 3316 cap = i + 1; 3317 } 3318 3319 while (i + 1 > cap) { 3320 3321 if (cap < 16) { 3322 cap = cap * 2; 3323 3324 } else { 3325 cap = cap + cap / 2; 3326 } 3327 } 3328 3329 if (cap != mmaps->cap) { 3330 |
3298 mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); 3299 if (nxt_slow_path(mmaps->elts == NULL)) { | 3331 e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t)); 3332 if (nxt_slow_path(e == NULL)) { |
3300 return NULL; 3301 } 3302 | 3333 return NULL; 3334 } 3335 |
3303 memset(mmaps->elts + mmaps->cap, 0, 3304 sizeof(*mmaps->elts) * (cap - mmaps->cap)); | 3336 mmaps->elts = e; |
3305 | 3337 |
3338 for (n = mmaps->cap; n < cap; n++) { 3339 e = mmaps->elts + n; 3340 3341 e->hdr = NULL; 3342 nxt_queue_init(&e->awaiting_rbuf); 3343 } 3344 |
|
3306 mmaps->cap = cap; 3307 } 3308 3309 if (i + 1 > mmaps->size) { 3310 mmaps->size = i + 1; 3311 } 3312 3313 return mmaps->elts + i; --- 262 unchanged lines hidden (view full) --- 3576 3577 return NXT_UNIT_OK; 3578} 3579 3580 3581static int 3582nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 3583{ | 3345 mmaps->cap = cap; 3346 } 3347 3348 if (i + 1 > mmaps->size) { 3349 mmaps->size = i + 1; 3350 } 3351 3352 return mmaps->elts + i; --- 262 unchanged lines hidden (view full) --- 3615 3616 return NXT_UNIT_OK; 3617} 3618 3619 3620static int 3621nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 3622{ |
3584 int rc; 3585 void *mem; 3586 struct stat mmap_stat; 3587 nxt_unit_mmap_t *mm; 3588 nxt_unit_impl_t *lib; 3589 nxt_unit_process_t *process; 3590 nxt_port_mmap_header_t *hdr; | 3623 int rc; 3624 void *mem; 3625 nxt_queue_t awaiting_rbuf; 3626 struct stat mmap_stat; 3627 nxt_unit_mmap_t *mm; 3628 nxt_unit_impl_t *lib; 3629 nxt_unit_process_t *process; 3630 nxt_unit_ctx_impl_t *ctx_impl; 3631 nxt_unit_read_buf_t *rbuf; 3632 nxt_port_mmap_header_t *hdr; |
3591 3592 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3593 3594 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 3595 3596 pthread_mutex_lock(&lib->mutex); 3597 3598 process = nxt_unit_process_find(lib, pid, 0); --- 22 unchanged lines hidden (view full) --- 3621 nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)", 3622 strerror(errno), errno); 3623 3624 goto fail; 3625 } 3626 3627 hdr = mem; 3628 | 3633 3634 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3635 3636 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 3637 3638 pthread_mutex_lock(&lib->mutex); 3639 3640 process = nxt_unit_process_find(lib, pid, 0); --- 22 unchanged lines hidden (view full) --- 3663 nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)", 3664 strerror(errno), errno); 3665 3666 goto fail; 3667 } 3668 3669 hdr = mem; 3670 |
3629 if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { | 3671 if (nxt_slow_path(hdr->src_pid != pid)) { |
3630 3631 nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " 3632 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 3633 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 3634 3635 munmap(mem, PORT_MMAP_SIZE); 3636 3637 goto fail; 3638 } 3639 | 3672 3673 nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " 3674 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 3675 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 3676 3677 munmap(mem, PORT_MMAP_SIZE); 3678 3679 goto fail; 3680 } 3681 |
3682 nxt_queue_init(&awaiting_rbuf); 3683 |
|
3640 pthread_mutex_lock(&process->incoming.mutex); 3641 3642 mm = nxt_unit_mmap_at(&process->incoming, hdr->id); 3643 if (nxt_slow_path(mm == NULL)) { 3644 nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array"); 3645 3646 munmap(mem, PORT_MMAP_SIZE); 3647 3648 } else { 3649 mm->hdr = hdr; 3650 3651 hdr->sent_over = 0xFFFFu; 3652 | 3684 pthread_mutex_lock(&process->incoming.mutex); 3685 3686 mm = nxt_unit_mmap_at(&process->incoming, hdr->id); 3687 if (nxt_slow_path(mm == NULL)) { 3688 nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array"); 3689 3690 munmap(mem, PORT_MMAP_SIZE); 3691 3692 } else { 3693 mm->hdr = hdr; 3694 3695 hdr->sent_over = 0xFFFFu; 3696 |
3697 nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf); 3698 nxt_queue_init(&mm->awaiting_rbuf); 3699 |
|
3653 rc = NXT_UNIT_OK; 3654 } 3655 3656 pthread_mutex_unlock(&process->incoming.mutex); 3657 | 3700 rc = NXT_UNIT_OK; 3701 } 3702 3703 pthread_mutex_unlock(&process->incoming.mutex); 3704 |
3705 nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { 3706 3707 ctx_impl = rbuf->ctx_impl; 3708 3709 pthread_mutex_lock(&ctx_impl->mutex); 3710 3711 nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link); 3712 3713 pthread_mutex_unlock(&ctx_impl->mutex); 3714 3715 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); 3716 3717 } nxt_queue_loop; 3718 |
|
3658fail: 3659 3660 nxt_unit_process_release(process); 3661 3662 return rc; 3663} 3664 3665 --- 48 unchanged lines hidden (view full) --- 3714 3715 free(mmaps->elts); 3716 } 3717 3718 pthread_mutex_destroy(&mmaps->mutex); 3719} 3720 3721 | 3719fail: 3720 3721 nxt_unit_process_release(process); 3722 3723 return rc; 3724} 3725 3726 --- 48 unchanged lines hidden (view full) --- 3775 3776 free(mmaps->elts); 3777 } 3778 3779 pthread_mutex_destroy(&mmaps->mutex); 3780} 3781 3782 |
3722static nxt_port_mmap_header_t * 3723nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 3724 uint32_t id) 3725{ 3726 nxt_port_mmap_header_t *hdr; 3727 3728 if (nxt_fast_path(process->incoming.size > id)) { 3729 hdr = process->incoming.elts[id].hdr; 3730 3731 } else { 3732 hdr = NULL; 3733 } 3734 3735 return hdr; 3736} 3737 3738 | |
3739static int | 3783static int |
3740nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) | 3784nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 3785 nxt_unit_read_buf_t *rbuf) |
3741{ | 3786{ |
3742 int rc; | 3787 int res; |
3743 nxt_chunk_id_t c; 3744 nxt_unit_process_t *process; 3745 nxt_port_mmap_header_t *hdr; 3746 nxt_port_mmap_tracking_msg_t *tracking_msg; 3747 3748 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 3749 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", 3750 recv_msg->stream, (int) recv_msg->size); 3751 | 3788 nxt_chunk_id_t c; 3789 nxt_unit_process_t *process; 3790 nxt_port_mmap_header_t *hdr; 3791 nxt_port_mmap_tracking_msg_t *tracking_msg; 3792 3793 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 3794 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", 3795 recv_msg->stream, (int) recv_msg->size); 3796 |
3752 return 0; | 3797 return NXT_UNIT_ERROR; |
3753 } 3754 3755 tracking_msg = recv_msg->start; 3756 3757 recv_msg->start = tracking_msg + 1; 3758 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); 3759 3760 process = nxt_unit_msg_get_process(ctx, recv_msg); 3761 if (nxt_slow_path(process == NULL)) { | 3798 } 3799 3800 tracking_msg = recv_msg->start; 3801 3802 recv_msg->start = tracking_msg + 1; 3803 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); 3804 3805 process = nxt_unit_msg_get_process(ctx, recv_msg); 3806 if (nxt_slow_path(process == NULL)) { |
3762 return 0; | 3807 return NXT_UNIT_ERROR; |
3763 } 3764 3765 pthread_mutex_lock(&process->incoming.mutex); 3766 | 3808 } 3809 3810 pthread_mutex_lock(&process->incoming.mutex); 3811 |
3767 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); 3768 if (nxt_slow_path(hdr == NULL)) { 3769 pthread_mutex_unlock(&process->incoming.mutex); | 3812 res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming, 3813 recv_msg->pid, tracking_msg->mmap_id, 3814 &hdr, rbuf); |
3770 | 3815 |
3771 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " 3772 "invalid mmap id %d,%"PRIu32, 3773 recv_msg->stream, (int) process->pid, 3774 tracking_msg->mmap_id); 3775 3776 return 0; | 3816 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3817 return res; |
3777 } 3778 3779 c = tracking_msg->tracking_id; | 3818 } 3819 3820 c = tracking_msg->tracking_id; |
3780 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); | 3821 res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); |
3781 | 3822 |
3782 if (rc == 0) { | 3823 if (res == 0) { |
3783 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", 3784 recv_msg->stream); 3785 3786 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); | 3824 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", 3825 recv_msg->stream); 3826 3827 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); |
3828 3829 res = NXT_UNIT_CANCELLED; 3830 3831 } else { 3832 res = NXT_UNIT_OK; |
|
3787 } 3788 3789 pthread_mutex_unlock(&process->incoming.mutex); 3790 | 3833 } 3834 3835 pthread_mutex_unlock(&process->incoming.mutex); 3836 |
3791 return rc; | 3837 return res; |
3792} 3793 3794 3795static int | 3838} 3839 3840 3841static int |
3796nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) | 3842nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, 3843 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, 3844 nxt_unit_read_buf_t *rbuf) |
3797{ | 3845{ |
3846 int res, need_rbuf; 3847 nxt_unit_mmap_t *mm; 3848 nxt_unit_ctx_impl_t *ctx_impl; 3849 3850 mm = nxt_unit_mmap_at(mmaps, id); 3851 if (nxt_slow_path(mm == NULL)) { 3852 nxt_unit_alert(ctx, "failed to allocate mmap"); 3853 3854 pthread_mutex_unlock(&mmaps->mutex); 3855 3856 *hdr = NULL; 3857 3858 return NXT_UNIT_ERROR; 3859 } 3860 3861 *hdr = mm->hdr; 3862 3863 if (nxt_fast_path(*hdr != NULL)) { 3864 return NXT_UNIT_OK; 3865 } 3866 3867 need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf); 3868 3869 nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link); 3870 3871 pthread_mutex_unlock(&mmaps->mutex); 3872 3873 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3874 3875 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 3876 3877 if (need_rbuf) { 3878 res = nxt_unit_get_mmap(ctx, pid, id); 3879 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 3880 return NXT_UNIT_ERROR; 3881 } 3882 } 3883 3884 return NXT_UNIT_AGAIN; 3885} 3886 3887 3888static int 3889nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 3890 nxt_unit_read_buf_t *rbuf) 3891{ 3892 int res; |
|
3798 void *start; 3799 uint32_t size; | 3893 void *start; 3894 uint32_t size; |
3895 nxt_unit_mmaps_t *mmaps; |
|
3800 nxt_unit_process_t *process; 3801 nxt_unit_mmap_buf_t *b, **incoming_tail; 3802 nxt_port_mmap_msg_t *mmap_msg, *end; 3803 nxt_port_mmap_header_t *hdr; 3804 3805 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 3806 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 3807 recv_msg->stream, (int) recv_msg->size); --- 6 unchanged lines hidden (view full) --- 3814 return NXT_UNIT_ERROR; 3815 } 3816 3817 mmap_msg = recv_msg->start; 3818 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 3819 3820 incoming_tail = &recv_msg->incoming_buf; 3821 | 3896 nxt_unit_process_t *process; 3897 nxt_unit_mmap_buf_t *b, **incoming_tail; 3898 nxt_port_mmap_msg_t *mmap_msg, *end; 3899 nxt_port_mmap_header_t *hdr; 3900 3901 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 3902 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 3903 recv_msg->stream, (int) recv_msg->size); --- 6 unchanged lines hidden (view full) --- 3910 return NXT_UNIT_ERROR; 3911 } 3912 3913 mmap_msg = recv_msg->start; 3914 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 3915 3916 incoming_tail = &recv_msg->incoming_buf; 3917 |
3918 /* Allocating buffer structures. */ |
|
3822 for (; mmap_msg < end; mmap_msg++) { 3823 b = nxt_unit_mmap_buf_get(ctx); 3824 if (nxt_slow_path(b == NULL)) { 3825 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 3826 recv_msg->stream); 3827 | 3919 for (; mmap_msg < end; mmap_msg++) { 3920 b = nxt_unit_mmap_buf_get(ctx); 3921 if (nxt_slow_path(b == NULL)) { 3922 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 3923 recv_msg->stream); 3924 |
3925 while (recv_msg->incoming_buf != NULL) { 3926 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 3927 } 3928 |
|
3828 return NXT_UNIT_ERROR; 3829 } 3830 3831 nxt_unit_mmap_buf_insert(incoming_tail, b); 3832 incoming_tail = &b->next; 3833 } 3834 3835 b = recv_msg->incoming_buf; 3836 mmap_msg = recv_msg->start; 3837 | 3929 return NXT_UNIT_ERROR; 3930 } 3931 3932 nxt_unit_mmap_buf_insert(incoming_tail, b); 3933 incoming_tail = &b->next; 3934 } 3935 3936 b = recv_msg->incoming_buf; 3937 mmap_msg = recv_msg->start; 3938 |
3838 pthread_mutex_lock(&process->incoming.mutex); | 3939 mmaps = &process->incoming; |
3839 | 3940 |
3941 pthread_mutex_lock(&mmaps->mutex); 3942 |
|
3840 for (; mmap_msg < end; mmap_msg++) { | 3943 for (; mmap_msg < end; mmap_msg++) { |
3841 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); 3842 if (nxt_slow_path(hdr == NULL)) { 3843 pthread_mutex_unlock(&process->incoming.mutex); | 3944 res = nxt_unit_check_rbuf_mmap(ctx, mmaps, 3945 recv_msg->pid, mmap_msg->mmap_id, 3946 &hdr, rbuf); |
3844 | 3947 |
3845 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 3846 "invalid mmap id %d,%"PRIu32, 3847 recv_msg->stream, (int) process->pid, 3848 mmap_msg->mmap_id); | 3948 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3949 while (recv_msg->incoming_buf != NULL) { 3950 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 3951 } |
3849 | 3952 |
3850 return NXT_UNIT_ERROR; | 3953 return res; |
3851 } 3852 3853 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 3854 size = mmap_msg->size; 3855 3856 if (recv_msg->start == mmap_msg) { 3857 recv_msg->start = start; 3858 recv_msg->size = size; --- 10 unchanged lines hidden (view full) --- 3869 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 3870 recv_msg->stream, 3871 start, (int) size, 3872 (int) hdr->src_pid, (int) hdr->dst_pid, 3873 (int) hdr->id, (int) mmap_msg->chunk_id, 3874 (int) mmap_msg->size); 3875 } 3876 | 3954 } 3955 3956 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 3957 size = mmap_msg->size; 3958 3959 if (recv_msg->start == mmap_msg) { 3960 recv_msg->start = start; 3961 recv_msg->size = size; --- 10 unchanged lines hidden (view full) --- 3972 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 3973 recv_msg->stream, 3974 start, (int) size, 3975 (int) hdr->src_pid, (int) hdr->dst_pid, 3976 (int) hdr->id, (int) mmap_msg->chunk_id, 3977 (int) mmap_msg->size); 3978 } 3979 |
3877 pthread_mutex_unlock(&process->incoming.mutex); | 3980 pthread_mutex_unlock(&mmaps->mutex); |
3878 3879 return NXT_UNIT_OK; 3880} 3881 3882 | 3981 3982 return NXT_UNIT_OK; 3983} 3984 3985 |
3986static int 3987nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) 3988{ 3989 ssize_t res; 3990 nxt_unit_impl_t *lib; 3991 nxt_unit_ctx_impl_t *ctx_impl; 3992 3993 struct { 3994 nxt_port_msg_t msg; 3995 nxt_port_msg_get_mmap_t get_mmap; 3996 } m; 3997 3998 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3999 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4000 4001 memset(&m.msg, 0, sizeof(nxt_port_msg_t)); 4002 4003 m.msg.pid = lib->pid; 4004 m.msg.reply_port = ctx_impl->read_port->id.id; 4005 m.msg.type = _NXT_PORT_MSG_GET_MMAP; 4006 4007 m.get_mmap.id = id; 4008 4009 nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); 4010 4011 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); 4012 if (nxt_slow_path(res != sizeof(m))) { 4013 return NXT_UNIT_ERROR; 4014 } 4015 4016 return NXT_UNIT_OK; 4017} 4018 4019 |
|
3883static void 3884nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 3885 nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, 3886 void *start, uint32_t size) 3887{ 3888 int freed_chunks; 3889 u_char *p, *end; 3890 nxt_chunk_id_t c; --- 214 unchanged lines hidden (view full) --- 4105 return rc; 4106} 4107 4108 4109int 4110nxt_unit_run_once(nxt_unit_ctx_t *ctx) 4111{ 4112 int rc; | 4020static void 4021nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 4022 nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, 4023 void *start, uint32_t size) 4024{ 4025 int freed_chunks; 4026 u_char *p, *end; 4027 nxt_chunk_id_t c; --- 214 unchanged lines hidden (view full) --- 4242 return rc; 4243} 4244 4245 4246int 4247nxt_unit_run_once(nxt_unit_ctx_t *ctx) 4248{ 4249 int rc; |
4250 nxt_queue_link_t *link; |
|
4113 nxt_unit_ctx_impl_t *ctx_impl; 4114 nxt_unit_read_buf_t *rbuf; 4115 4116 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4117 4118 nxt_unit_ctx_use(ctx_impl); 4119 4120 pthread_mutex_lock(&ctx_impl->mutex); 4121 | 4251 nxt_unit_ctx_impl_t *ctx_impl; 4252 nxt_unit_read_buf_t *rbuf; 4253 4254 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4255 4256 nxt_unit_ctx_use(ctx_impl); 4257 4258 pthread_mutex_lock(&ctx_impl->mutex); 4259 |
4122 if (ctx_impl->pending_read_head != NULL) { 4123 rbuf = ctx_impl->pending_read_head; 4124 ctx_impl->pending_read_head = rbuf->next; | 4260 if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { |
4125 | 4261 |
4126 if (ctx_impl->pending_read_tail == &rbuf->next) { 4127 ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; 4128 } | 4262next_pending: |
4129 | 4263 |
4264 link = nxt_queue_first(&ctx_impl->pending_rbuf); 4265 nxt_queue_remove(link); 4266 4267 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); 4268 |
|
4130 pthread_mutex_unlock(&ctx_impl->mutex); 4131 4132 } else { 4133 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); | 4269 pthread_mutex_unlock(&ctx_impl->mutex); 4270 4271 } else { 4272 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); |
4273 4274 pthread_mutex_unlock(&ctx_impl->mutex); 4275 |
|
4134 if (nxt_slow_path(rbuf == NULL)) { 4135 4136 nxt_unit_ctx_release(ctx_impl); 4137 4138 return NXT_UNIT_ERROR; 4139 } 4140 4141 nxt_unit_read_buf(ctx, rbuf); 4142 } 4143 4144 if (nxt_fast_path(rbuf->size > 0)) { | 4276 if (nxt_slow_path(rbuf == NULL)) { 4277 4278 nxt_unit_ctx_release(ctx_impl); 4279 4280 return NXT_UNIT_ERROR; 4281 } 4282 4283 nxt_unit_read_buf(ctx, rbuf); 4284 } 4285 4286 if (nxt_fast_path(rbuf->size > 0)) { |
4145 rc = nxt_unit_process_msg(ctx, 4146 rbuf->buf, rbuf->size, 4147 rbuf->oob, sizeof(rbuf->oob)); | 4287 rc = nxt_unit_process_msg(ctx, rbuf); |
4148 4149#if (NXT_DEBUG) | 4288 4289#if (NXT_DEBUG) |
4150 memset(rbuf->buf, 0xAC, rbuf->size); | 4290 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { 4291 memset(rbuf->buf, 0xAC, rbuf->size); 4292 } |
4151#endif 4152 4153 } else { 4154 rc = NXT_UNIT_ERROR; 4155 } 4156 | 4293#endif 4294 4295 } else { 4296 rc = NXT_UNIT_ERROR; 4297 } 4298 |
4157 nxt_unit_read_buf_release(ctx, rbuf); | 4299 if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) { 4300 rc = NXT_UNIT_OK; |
4158 | 4301 |
4159 nxt_unit_process_ready_req(ctx_impl); | 4302 } else { 4303 nxt_unit_read_buf_release(ctx, rbuf); 4304 } |
4160 | 4305 |
4306 if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) { 4307 rc = NXT_UNIT_OK; 4308 } 4309 4310 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 4311 pthread_mutex_lock(&ctx_impl->mutex); 4312 4313 if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { 4314 goto next_pending; 4315 } 4316 4317 pthread_mutex_unlock(&ctx_impl->mutex); 4318 4319 nxt_unit_process_ready_req(ctx_impl); 4320 } 4321 |
|
4161 nxt_unit_ctx_release(ctx_impl); 4162 4163 return rc; 4164} 4165 4166 4167static void 4168nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) --- 1133 unchanged lines hidden --- | 4322 nxt_unit_ctx_release(ctx_impl); 4323 4324 return rc; 4325} 4326 4327 4328static void 4329nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) --- 1133 unchanged lines hidden --- |