1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <stdlib.h> 7 8 #include "nxt_main.h" 9 #include "nxt_port_memory_int.h" 10 #include "nxt_port_queue.h" 11 #include "nxt_app_queue.h" 12 13 #include "nxt_unit.h" 14 #include "nxt_unit_request.h" 15 #include "nxt_unit_response.h" 16 #include "nxt_unit_websocket.h" 17 18 #include "nxt_websocket.h" 19 20 #if (NXT_HAVE_MEMFD_CREATE) 21 #include <linux/memfd.h> 22 #endif 23 24 #define NXT_UNIT_MAX_PLAIN_SIZE 1024 25 #define NXT_UNIT_LOCAL_BUF_SIZE \ 26 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) 27 28 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 29 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 30 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 31 typedef struct nxt_unit_process_s nxt_unit_process_t; 32 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 33 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 34 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; 35 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 36 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 37 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 38 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 39 40 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 41 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 42 nxt_unit_ctx_impl_t *ctx_impl, void *data); 43 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx); 44 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx); 45 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); 46 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); 47 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 48 nxt_unit_mmap_buf_t *mmap_buf); 49 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 50 nxt_unit_mmap_buf_t *mmap_buf); 51 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 52 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 53 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, 54 int *log_fd, uint32_t *stream, uint32_t *shm_limit); 55 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, 56 int queue_fd); 57 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); 58 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 59 nxt_unit_recv_msg_t *recv_msg); 60 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx); 61 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 62 nxt_unit_recv_msg_t *recv_msg); 63 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, 64 nxt_unit_recv_msg_t *recv_msg); 65 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 66 nxt_unit_port_id_t *port_id); 67 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); 68 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 69 nxt_unit_recv_msg_t *recv_msg); 70 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); 71 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 72 nxt_unit_ctx_t *ctx); 73 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 74 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 75 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 76 nxt_unit_ctx_t *ctx); 77 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 78 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 79 nxt_unit_websocket_frame_impl_t *ws); 80 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 81 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 82 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 83 nxt_unit_mmap_buf_t *mmap_buf, int last); 84 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 85 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); 86 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); 87 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( 88 nxt_unit_ctx_impl_t *ctx_impl); 89 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 90 nxt_unit_read_buf_t *rbuf); 91 static nxt_unit_mmap_buf_t *nxt_unit_request_preread( 92 nxt_unit_request_info_t *req, size_t size); 93 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 94 size_t size); 95 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 96 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n); 97 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 98 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); 99 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 100 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 101 nxt_unit_port_t *port, int n); 102 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); 103 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 104 int fd); 105 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 106 nxt_unit_port_t *port, uint32_t size, 107 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 108 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 109 110 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, 111 nxt_unit_ctx_impl_t *ctx_impl); 112 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 113 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); 114 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); 115 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 116 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, 117 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, 118 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); 119 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 120 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); 121 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); 122 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 123 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 124 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 125 126 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid); 127 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, 128 pid_t pid, int remove); 129 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 130 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); 131 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); 132 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); 133 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); 134 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); 135 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); 136 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); 137 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); 138 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, 139 nxt_unit_port_t *port); 140 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); 141 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); 142 143 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 144 nxt_unit_port_t *port, int queue_fd); 145 146 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); 147 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); 148 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, 149 nxt_unit_port_t *port, void *queue); 150 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, 151 nxt_queue_t *awaiting_req); 152 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, 153 nxt_unit_port_id_t *port_id); 154 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, 155 nxt_unit_port_id_t *port_id); 156 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); 157 static void nxt_unit_remove_process(nxt_unit_impl_t *lib, 158 nxt_unit_process_t *process); 159 static void nxt_unit_quit(nxt_unit_ctx_t *ctx); 160 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); 161 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, 162 nxt_unit_port_t *port, const void *buf, size_t buf_size, 163 const void *oob, size_t oob_size); 164 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 165 const void *buf, size_t buf_size, const void *oob, size_t oob_size); 166 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 167 nxt_unit_read_buf_t *rbuf); 168 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, 169 nxt_unit_read_buf_t *src); 170 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 171 nxt_unit_read_buf_t *rbuf); 172 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 173 nxt_unit_read_buf_t *rbuf); 174 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, 175 nxt_unit_read_buf_t *rbuf); 176 static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, 177 nxt_unit_read_buf_t *rbuf); 178 nxt_inline int nxt_unit_close(int fd); 179 static int nxt_unit_fd_blocking(int fd); 180 181 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 182 nxt_unit_port_t *port); 183 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 184 nxt_unit_port_id_t *port_id, int remove); 185 186 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 187 nxt_unit_request_info_t *req); 188 static nxt_unit_request_info_t *nxt_unit_request_hash_find( 189 nxt_unit_ctx_t *ctx, uint32_t stream, int remove); 190 191 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 192 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size); 193 static void nxt_unit_lvlhsh_free(void *data, void *p); 194 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length); 195 196 197 struct nxt_unit_mmap_buf_s { 198 nxt_unit_buf_t buf; 199 200 nxt_unit_mmap_buf_t *next; 201 nxt_unit_mmap_buf_t **prev; 202 203 nxt_port_mmap_header_t *hdr; 204 nxt_unit_request_info_t *req; 205 nxt_unit_ctx_impl_t *ctx_impl; 206 char *free_ptr; 207 char *plain_ptr; 208 }; 209 210 211 struct nxt_unit_recv_msg_s { 212 uint32_t stream; 213 nxt_pid_t pid; 214 nxt_port_id_t reply_port; 215 216 uint8_t last; /* 1 bit */ 217 uint8_t mmap; /* 1 bit */ 218 219 void *start; 220 uint32_t size; 221 222 int fd[2]; 223 224 nxt_unit_mmap_buf_t *incoming_buf; 225 }; 226 227 228 typedef enum { 229 NXT_UNIT_RS_START = 0, 230 NXT_UNIT_RS_RESPONSE_INIT, 231 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 232 NXT_UNIT_RS_RESPONSE_SENT, 233 NXT_UNIT_RS_RELEASED, 234 } nxt_unit_req_state_t; 235 236 237 struct nxt_unit_request_info_impl_s { 238 nxt_unit_request_info_t req; 239 240 uint32_t stream; 241 242 nxt_unit_mmap_buf_t *outgoing_buf; 243 nxt_unit_mmap_buf_t *incoming_buf; 244 245 nxt_unit_req_state_t state; 246 uint8_t websocket; 247 uint8_t in_hash; 248 249 /* for nxt_unit_ctx_impl_t.free_req or active_req */ 250 nxt_queue_link_t link; 251 /* for nxt_unit_port_impl_t.awaiting_req */ 252 nxt_queue_link_t port_wait_link; 253 254 char extra_data[]; 255 }; 256 257 258 struct nxt_unit_websocket_frame_impl_s { 259 nxt_unit_websocket_frame_t ws; 260 261 nxt_unit_mmap_buf_t *buf; 262 263 nxt_queue_link_t link; 264 265 nxt_unit_ctx_impl_t *ctx_impl; 266 }; 267 268 269 struct nxt_unit_read_buf_s { 270 nxt_queue_link_t link; 271 nxt_unit_ctx_impl_t *ctx_impl; 272 ssize_t size; 273 char buf[16384]; 274 char oob[256]; 275 }; 276 277 278 struct nxt_unit_ctx_impl_s { 279 nxt_unit_ctx_t ctx; 280 281 nxt_atomic_t use_count; 282 nxt_atomic_t wait_items; 283 284 pthread_mutex_t mutex; 285 286 nxt_unit_port_t *read_port; 287 288 nxt_queue_link_t link; 289 290 nxt_unit_mmap_buf_t *free_buf; 291 292 /* of nxt_unit_request_info_impl_t */ 293 nxt_queue_t free_req; 294 295 /* of nxt_unit_websocket_frame_impl_t */ 296 nxt_queue_t free_ws; 297 298 /* of nxt_unit_request_info_impl_t */ 299 nxt_queue_t active_req; 300 301 /* of nxt_unit_request_info_impl_t */ 302 nxt_lvlhsh_t requests; 303 304 /* of nxt_unit_request_info_impl_t */ 305 nxt_queue_t ready_req; 306 307 /* of nxt_unit_read_buf_t */ 308 nxt_queue_t pending_rbuf; 309 310 /* of nxt_unit_read_buf_t */ 311 nxt_queue_t free_rbuf; 312 313 int online; 314 int ready; 315 316 nxt_unit_mmap_buf_t ctx_buf[2]; 317 nxt_unit_read_buf_t ctx_read_buf; 318 319 nxt_unit_request_info_impl_t req; 320 }; 321 322 323 struct nxt_unit_mmap_s { 324 nxt_port_mmap_header_t *hdr; 325 pthread_t src_thread; 326 327 /* of nxt_unit_read_buf_t */ 328 nxt_queue_t awaiting_rbuf; 329 }; 330 331 332 struct nxt_unit_mmaps_s { 333 pthread_mutex_t mutex; 334 uint32_t size; 335 uint32_t cap; 336 nxt_atomic_t allocated_chunks; 337 nxt_unit_mmap_t *elts; 338 }; 339 340 341 struct nxt_unit_impl_s { 342 nxt_unit_t unit; 343 nxt_unit_callbacks_t callbacks; 344 345 nxt_atomic_t use_count; 346 347 uint32_t request_data_size; 348 uint32_t shm_mmap_limit; 349 350 pthread_mutex_t mutex; 351 352 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 353 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 354 355 nxt_unit_port_t *router_port; 356 nxt_unit_port_t *shared_port; 357 358 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 359 360 nxt_unit_mmaps_t incoming; 361 nxt_unit_mmaps_t outgoing; 362 363 pid_t pid; 364 int log_fd; 365 366 nxt_unit_ctx_impl_t main_ctx; 367 }; 368 369 370 struct nxt_unit_port_impl_s { 371 nxt_unit_port_t port; 372 373 nxt_atomic_t use_count; 374 375 /* for nxt_unit_process_t.ports */ 376 nxt_queue_link_t link; 377 nxt_unit_process_t *process; 378 379 /* of nxt_unit_request_info_impl_t */ 380 nxt_queue_t awaiting_req; 381 382 int ready; 383 384 void *queue; 385 386 int from_socket; 387 nxt_unit_read_buf_t *socket_rbuf; 388 }; 389 390 391 struct nxt_unit_process_s { 392 pid_t pid; 393 394 nxt_queue_t ports; /* of nxt_unit_port_impl_t */ 395 396 nxt_unit_impl_t *lib; 397 398 nxt_atomic_t use_count; 399 400 uint32_t next_port_id; 401 }; 402 403 404 /* Explicitly using 32 bit types to avoid possible alignment. */ 405 typedef struct { 406 int32_t pid; 407 uint32_t id; 408 } nxt_unit_port_hash_id_t; 409 410 411 nxt_unit_ctx_t * 412 nxt_unit_init(nxt_unit_init_t *init) 413 { 414 int rc, queue_fd; 415 void *mem; 416 uint32_t ready_stream, shm_limit; 417 nxt_unit_ctx_t *ctx; 418 nxt_unit_impl_t *lib; 419 nxt_unit_port_t ready_port, router_port, read_port; 420 421 lib = nxt_unit_create(init); 422 if (nxt_slow_path(lib == NULL)) { 423 return NULL; 424 } 425 426 queue_fd = -1; 427 mem = MAP_FAILED; 428 429 if (init->ready_port.id.pid != 0 430 && init->ready_stream != 0 431 && init->read_port.id.pid != 0) 432 { 433 ready_port = init->ready_port; 434 ready_stream = init->ready_stream; 435 router_port = init->router_port; 436 read_port = init->read_port; 437 lib->log_fd = init->log_fd; 438 439 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 440 ready_port.id.id); 441 nxt_unit_port_id_init(&router_port.id, router_port.id.pid, 442 router_port.id.id); 443 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 444 read_port.id.id); 445 446 } else { 447 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, 448 &lib->log_fd, &ready_stream, &shm_limit); 449 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 450 goto fail; 451 } 452 453 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) 454 / PORT_MMAP_DATA_SIZE; 455 } 456 457 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { 458 lib->shm_mmap_limit = 1; 459 } 460 461 lib->pid = read_port.id.pid; 462 463 ctx = &lib->main_ctx.ctx; 464 465 rc = nxt_unit_fd_blocking(router_port.out_fd); 466 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 467 goto fail; 468 } 469 470 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); 471 if (nxt_slow_path(lib->router_port == NULL)) { 472 nxt_unit_alert(NULL, "failed to add router_port"); 473 474 goto fail; 475 } 476 477 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); 478 if (nxt_slow_path(queue_fd == -1)) { 479 goto fail; 480 } 481 482 mem = mmap(NULL, sizeof(nxt_port_queue_t), 483 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 484 if (nxt_slow_path(mem == MAP_FAILED)) { 485 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 486 strerror(errno), errno); 487 488 goto fail; 489 } 490 491 nxt_port_queue_init(mem); 492 493 rc = nxt_unit_fd_blocking(read_port.in_fd); 494 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 495 goto fail; 496 } 497 498 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); 499 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { 500 nxt_unit_alert(NULL, "failed to add read_port"); 501 502 goto fail; 503 } 504 505 rc = nxt_unit_fd_blocking(ready_port.out_fd); 506 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 507 goto fail; 508 } 509 510 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); 511 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 512 nxt_unit_alert(NULL, "failed to send READY message"); 513 514 goto fail; 515 } 516 517 nxt_unit_close(ready_port.out_fd); 518 nxt_unit_close(queue_fd); 519 520 return ctx; 521 522 fail: 523 524 if (mem != MAP_FAILED) { 525 munmap(mem, sizeof(nxt_port_queue_t)); 526 } 527 528 if (queue_fd != -1) { 529 nxt_unit_close(queue_fd); 530 } 531 532 nxt_unit_ctx_release(&lib->main_ctx.ctx); 533 534 return NULL; 535 } 536 537 538 static nxt_unit_impl_t * 539 nxt_unit_create(nxt_unit_init_t *init) 540 { 541 int rc; 542 nxt_unit_impl_t *lib; 543 nxt_unit_callbacks_t *cb; 544 545 lib = nxt_unit_malloc(NULL, 546 sizeof(nxt_unit_impl_t) + init->request_data_size); 547 if (nxt_slow_path(lib == NULL)) { 548 nxt_unit_alert(NULL, "failed to allocate unit struct"); 549 550 return NULL; 551 } 552 553 rc = pthread_mutex_init(&lib->mutex, NULL); 554 if (nxt_slow_path(rc != 0)) { 555 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 556 557 goto fail; 558 } 559 560 lib->unit.data = init->data; 561 lib->callbacks = init->callbacks; 562 563 lib->request_data_size = init->request_data_size; 564 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) 565 / PORT_MMAP_DATA_SIZE; 566 567 lib->processes.slot = NULL; 568 lib->ports.slot = NULL; 569 570 lib->log_fd = STDERR_FILENO; 571 572 nxt_queue_init(&lib->contexts); 573 574 lib->use_count = 0; 575 lib->router_port = NULL; 576 lib->shared_port = NULL; 577 578 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 579 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 580 pthread_mutex_destroy(&lib->mutex); 581 goto fail; 582 } 583 584 cb = &lib->callbacks; 585 586 if (cb->request_handler == NULL) { 587 nxt_unit_alert(NULL, "request_handler is NULL"); 588 589 pthread_mutex_destroy(&lib->mutex); 590 goto fail; 591 } 592 593 nxt_unit_mmaps_init(&lib->incoming); 594 nxt_unit_mmaps_init(&lib->outgoing); 595 596 return lib; 597 598 fail: 599 600 nxt_unit_free(NULL, lib); 601 602 return NULL; 603 } 604 605 606 static int 607 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 608 void *data) 609 { 610 int rc; 611 612 ctx_impl->ctx.data = data; 613 ctx_impl->ctx.unit = &lib->unit; 614 615 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 616 if (nxt_slow_path(rc != 0)) { 617 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 618 619 return NXT_UNIT_ERROR; 620 } 621 622 nxt_unit_lib_use(lib); 623 624 pthread_mutex_lock(&lib->mutex); 625 626 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 627 628 pthread_mutex_unlock(&lib->mutex); 629 630 ctx_impl->use_count = 1; 631 ctx_impl->wait_items = 0; 632 ctx_impl->online = 1; 633 ctx_impl->ready = 0; 634 635 nxt_queue_init(&ctx_impl->free_req); 636 nxt_queue_init(&ctx_impl->free_ws); 637 nxt_queue_init(&ctx_impl->active_req); 638 nxt_queue_init(&ctx_impl->ready_req); 639 nxt_queue_init(&ctx_impl->pending_rbuf); 640 nxt_queue_init(&ctx_impl->free_rbuf); 641 642 ctx_impl->free_buf = NULL; 643 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 644 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 645 646 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 647 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); 648 649 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; 650 651 ctx_impl->req.req.ctx = &ctx_impl->ctx; 652 ctx_impl->req.req.unit = &lib->unit; 653 654 ctx_impl->read_port = NULL; 655 ctx_impl->requests.slot = 0; 656 657 return NXT_UNIT_OK; 658 } 659 660 661 nxt_inline void 662 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) 663 { 664 nxt_unit_ctx_impl_t *ctx_impl; 665 666 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 667 668 nxt_atomic_fetch_add(&ctx_impl->use_count, 1); 669 } 670 671 672 nxt_inline void 673 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) 674 { 675 long c; 676 nxt_unit_ctx_impl_t *ctx_impl; 677 678 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 679 680 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); 681 682 if (c == 1) { 683 nxt_unit_ctx_free(ctx_impl); 684 } 685 } 686 687 688 nxt_inline void 689 nxt_unit_lib_use(nxt_unit_impl_t *lib) 690 { 691 nxt_atomic_fetch_add(&lib->use_count, 1); 692 } 693 694 695 nxt_inline void 696 nxt_unit_lib_release(nxt_unit_impl_t *lib) 697 { 698 long c; 699 nxt_unit_process_t *process; 700 701 c = nxt_atomic_fetch_add(&lib->use_count, -1); 702 703 if (c == 1) { 704 for ( ;; ) { 705 pthread_mutex_lock(&lib->mutex); 706 707 process = nxt_unit_process_pop_first(lib); 708 if (process == NULL) { 709 pthread_mutex_unlock(&lib->mutex); 710 711 break; 712 } 713 714 nxt_unit_remove_process(lib, process); 715 } 716 717 pthread_mutex_destroy(&lib->mutex); 718 719 if (nxt_fast_path(lib->router_port != NULL)) { 720 nxt_unit_port_release(lib->router_port); 721 } 722 723 if (nxt_fast_path(lib->shared_port != NULL)) { 724 nxt_unit_port_release(lib->shared_port); 725 } 726 727 nxt_unit_mmaps_destroy(&lib->incoming); 728 nxt_unit_mmaps_destroy(&lib->outgoing); 729 730 nxt_unit_free(NULL, lib); 731 } 732 } 733 734 735 nxt_inline void 736 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 737 nxt_unit_mmap_buf_t *mmap_buf) 738 { 739 mmap_buf->next = *head; 740 741 if (mmap_buf->next != NULL) { 742 mmap_buf->next->prev = &mmap_buf->next; 743 } 744 745 *head = mmap_buf; 746 mmap_buf->prev = head; 747 } 748 749 750 nxt_inline void 751 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 752 nxt_unit_mmap_buf_t *mmap_buf) 753 { 754 while (*prev != NULL) { 755 prev = &(*prev)->next; 756 } 757 758 nxt_unit_mmap_buf_insert(prev, mmap_buf); 759 } 760 761 762 nxt_inline void 763 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) 764 { 765 nxt_unit_mmap_buf_t **prev; 766 767 prev = mmap_buf->prev; 768 769 if (mmap_buf->next != NULL) { 770 mmap_buf->next->prev = prev; 771 } 772 773 if (prev != NULL) { 774 *prev = mmap_buf->next; 775 } 776 } 777 778 779 static int 780 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, 781 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, 782 uint32_t *shm_limit) 783 { 784 int rc; 785 int ready_fd, router_fd, read_in_fd, read_out_fd; 786 char *unit_init, *version_end; 787 long version_length; 788 int64_t ready_pid, router_pid, read_pid; 789 uint32_t ready_stream, router_id, ready_id, read_id; 790 791 unit_init = getenv(NXT_UNIT_INIT_ENV); 792 if (nxt_slow_path(unit_init == NULL)) { 793 nxt_unit_alert(NULL, "%s is not in the current environment", 794 NXT_UNIT_INIT_ENV); 795 796 return NXT_UNIT_ERROR; 797 } 798 799 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 800 801 version_length = nxt_length(NXT_VERSION); 802 803 version_end = strchr(unit_init, ';'); 804 if (version_end == NULL 805 || version_end - unit_init != version_length 806 || memcmp(unit_init, NXT_VERSION, version_length) != 0) 807 { 808 nxt_unit_alert(NULL, "version check error"); 809 810 return NXT_UNIT_ERROR; 811 } 812 813 rc = sscanf(version_end + 1, 814 "%"PRIu32";" 815 "%"PRId64",%"PRIu32",%d;" 816 "%"PRId64",%"PRIu32",%d;" 817 "%"PRId64",%"PRIu32",%d,%d;" 818 "%d,%"PRIu32, 819 &ready_stream, 820 &ready_pid, &ready_id, &ready_fd, 821 &router_pid, &router_id, &router_fd, 822 &read_pid, &read_id, &read_in_fd, &read_out_fd, 823 log_fd, shm_limit); 824 825 if (nxt_slow_path(rc != 13)) { 826 nxt_unit_alert(NULL, "failed to scan variables: %d", rc); 827 828 return NXT_UNIT_ERROR; 829 } 830 831 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 832 833 ready_port->in_fd = -1; 834 ready_port->out_fd = ready_fd; 835 ready_port->data = NULL; 836 837 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); 838 839 router_port->in_fd = -1; 840 router_port->out_fd = router_fd; 841 router_port->data = NULL; 842 843 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 844 845 read_port->in_fd = read_in_fd; 846 read_port->out_fd = read_out_fd; 847 read_port->data = NULL; 848 849 *stream = ready_stream; 850 851 return NXT_UNIT_OK; 852 } 853 854 855 static int 856 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) 857 { 858 ssize_t res; 859 nxt_port_msg_t msg; 860 nxt_unit_impl_t *lib; 861 862 union { 863 struct cmsghdr cm; 864 char space[CMSG_SPACE(sizeof(int))]; 865 } cmsg; 866 867 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 868 869 msg.stream = stream; 870 msg.pid = lib->pid; 871 msg.reply_port = 0; 872 msg.type = _NXT_PORT_MSG_PROCESS_READY; 873 msg.last = 1; 874 msg.mmap = 0; 875 msg.nf = 0; 876 msg.mf = 0; 877 msg.tracking = 0; 878 879 memset(&cmsg, 0, sizeof(cmsg)); 880 881 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 882 cmsg.cm.cmsg_level = SOL_SOCKET; 883 cmsg.cm.cmsg_type = SCM_RIGHTS; 884 885 /* 886 * memcpy() is used instead of simple 887 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 888 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 889 * dereferencing type-punned pointer will break strict-aliasing rules 890 * 891 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 892 * in the same simple assignment as in the code above. 893 */ 894 memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); 895 896 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), 897 &cmsg, sizeof(cmsg)); 898 if (res != sizeof(msg)) { 899 return NXT_UNIT_ERROR; 900 } 901 902 return NXT_UNIT_OK; 903 } 904 905 906 static int 907 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) 908 { 909 int rc; 910 pid_t pid; 911 struct cmsghdr *cm; 912 nxt_port_msg_t *port_msg; 913 nxt_unit_impl_t *lib; 914 nxt_unit_recv_msg_t recv_msg; 915 916 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 917 918 recv_msg.fd[0] = -1; 919 recv_msg.fd[1] = -1; 920 port_msg = (nxt_port_msg_t *) rbuf->buf; 921 cm = (struct cmsghdr *) rbuf->oob; 922 923 if (cm->cmsg_level == SOL_SOCKET 924 && cm->cmsg_type == SCM_RIGHTS) 925 { 926 if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { 927 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 928 } 929 930 if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { 931 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); 932 } 933 } 934 935 recv_msg.incoming_buf = NULL; 936 937 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 938 if (nxt_slow_path(rbuf->size == 0)) { 939 nxt_unit_debug(ctx, "read port closed"); 940 941 nxt_unit_quit(ctx); 942 rc = NXT_UNIT_OK; 943 goto done; 944 } 945 946 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); 947 948 rc = NXT_UNIT_ERROR; 949 goto done; 950 } 951 952 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", 953 port_msg->stream, (int) port_msg->type, 954 recv_msg.fd[0], recv_msg.fd[1]); 955 956 recv_msg.stream = port_msg->stream; 957 recv_msg.pid = port_msg->pid; 958 recv_msg.reply_port = port_msg->reply_port; 959 recv_msg.last = port_msg->last; 960 recv_msg.mmap = port_msg->mmap; 961 962 recv_msg.start = port_msg + 1; 963 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); 964 965 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 966 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)", 967 port_msg->stream, (int) port_msg->type); 968 rc = NXT_UNIT_ERROR; 969 goto done; 970 } 971 972 /* Fragmentation is unsupported. */ 973 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 974 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)", 975 port_msg->stream, (int) port_msg->type); 976 rc = NXT_UNIT_ERROR; 977 goto done; 978 } 979 980 if (port_msg->mmap) { 981 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); 982 983 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 984 if (rc == NXT_UNIT_AGAIN) { 985 recv_msg.fd[0] = -1; 986 recv_msg.fd[1] = -1; 987 } 988 989 goto done; 990 } 991 } 992 993 switch (port_msg->type) { 994 995 case _NXT_PORT_MSG_RPC_READY: 996 rc = NXT_UNIT_OK; 997 break; 998 999 case _NXT_PORT_MSG_QUIT: 1000 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 1001 1002 nxt_unit_quit(ctx); 1003 rc = NXT_UNIT_OK; 1004 break; 1005 1006 case _NXT_PORT_MSG_NEW_PORT: 1007 rc = nxt_unit_process_new_port(ctx, &recv_msg); 1008 break; 1009 1010 case _NXT_PORT_MSG_PORT_ACK: 1011 rc = nxt_unit_ctx_ready(ctx); 1012 break; 1013 1014 case _NXT_PORT_MSG_CHANGE_FILE: 1015 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 1016 port_msg->stream, recv_msg.fd[0]); 1017 1018 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { 1019 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", 1020 port_msg->stream, recv_msg.fd[0], lib->log_fd, 1021 strerror(errno), errno); 1022 1023 rc = NXT_UNIT_ERROR; 1024 goto done; 1025 } 1026 1027 rc = NXT_UNIT_OK; 1028 break; 1029 1030 case _NXT_PORT_MSG_MMAP: 1031 if (nxt_slow_path(recv_msg.fd[0] < 0)) { 1032 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 1033 port_msg->stream, recv_msg.fd[0]); 1034 1035 rc = NXT_UNIT_ERROR; 1036 goto done; 1037 } 1038 1039 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); 1040 break; 1041 1042 case _NXT_PORT_MSG_REQ_HEADERS: 1043 rc = nxt_unit_process_req_headers(ctx, &recv_msg); 1044 break; 1045 1046 case _NXT_PORT_MSG_REQ_BODY: 1047 rc = nxt_unit_process_req_body(ctx, &recv_msg); 1048 break; 1049 1050 case _NXT_PORT_MSG_WEBSOCKET: 1051 rc = nxt_unit_process_websocket(ctx, &recv_msg); 1052 break; 1053 1054 case _NXT_PORT_MSG_REMOVE_PID: 1055 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 1056 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " 1057 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 1058 (int) sizeof(pid)); 1059 1060 rc = NXT_UNIT_ERROR; 1061 goto done; 1062 } 1063 1064 memcpy(&pid, recv_msg.start, sizeof(pid)); 1065 1066 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 1067 port_msg->stream, (int) pid); 1068 1069 nxt_unit_remove_pid(lib, pid); 1070 1071 rc = NXT_UNIT_OK; 1072 break; 1073 1074 case _NXT_PORT_MSG_SHM_ACK: 1075 rc = nxt_unit_process_shm_ack(ctx); 1076 break; 1077 1078 default: 1079 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d", 1080 port_msg->stream, (int) port_msg->type); 1081 1082 rc = NXT_UNIT_ERROR; 1083 goto done; 1084 } 1085 1086 done: 1087 1088 if (recv_msg.fd[0] != -1) { 1089 nxt_unit_close(recv_msg.fd[0]); 1090 } 1091 1092 if (recv_msg.fd[1] != -1) { 1093 nxt_unit_close(recv_msg.fd[1]); 1094 } 1095 1096 while (recv_msg.incoming_buf != NULL) { 1097 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 1098 } 1099 1100 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { 1101 #if (NXT_DEBUG) 1102 memset(rbuf->buf, 0xAC, rbuf->size); 1103 #endif 1104 nxt_unit_read_buf_release(ctx, rbuf); 1105 } 1106 1107 return rc; 1108 } 1109 1110 1111 static int 1112 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1113 { 1114 void *mem; 1115 nxt_unit_impl_t *lib; 1116 nxt_unit_port_t new_port, *port; 1117 nxt_port_msg_new_port_t *new_port_msg; 1118 1119 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 1120 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 1121 "invalid message size (%d)", 1122 recv_msg->stream, (int) recv_msg->size); 1123 1124 return NXT_UNIT_ERROR; 1125 } 1126 1127 if (nxt_slow_path(recv_msg->fd[0] < 0)) { 1128 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 1129 recv_msg->stream, recv_msg->fd[0]); 1130 1131 return NXT_UNIT_ERROR; 1132 } 1133 1134 new_port_msg = recv_msg->start; 1135 1136 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d", 1137 recv_msg->stream, (int) new_port_msg->pid, 1138 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); 1139 1140 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1141 1142 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { 1143 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); 1144 1145 new_port.in_fd = recv_msg->fd[0]; 1146 new_port.out_fd = -1; 1147 1148 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, 1149 MAP_SHARED, recv_msg->fd[1], 0); 1150 1151 } else { 1152 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) 1153 != NXT_UNIT_OK)) 1154 { 1155 return NXT_UNIT_ERROR; 1156 } 1157 1158 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 1159 new_port_msg->id); 1160 1161 new_port.in_fd = -1; 1162 new_port.out_fd = recv_msg->fd[0]; 1163 1164 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, 1165 MAP_SHARED, recv_msg->fd[1], 0); 1166 } 1167 1168 if (nxt_slow_path(mem == MAP_FAILED)) { 1169 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], 1170 strerror(errno), errno); 1171 1172 return NXT_UNIT_ERROR; 1173 } 1174 1175 new_port.data = NULL; 1176 1177 recv_msg->fd[0] = -1; 1178 1179 port = nxt_unit_add_port(ctx, &new_port, mem); 1180 if (nxt_slow_path(port == NULL)) { 1181 return NXT_UNIT_ERROR; 1182 } 1183 1184 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { 1185 lib->shared_port = port; 1186 1187 return nxt_unit_ctx_ready(ctx); 1188 } 1189 1190 nxt_unit_port_release(port); 1191 1192 return NXT_UNIT_OK; 1193 } 1194 1195 1196 static int 1197 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) 1198 { 1199 nxt_unit_impl_t *lib; 1200 nxt_unit_ctx_impl_t *ctx_impl; 1201 1202 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1203 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1204 1205 ctx_impl->ready = 1; 1206 1207 if (lib->callbacks.ready_handler) { 1208 return lib->callbacks.ready_handler(ctx); 1209 } 1210 1211 return NXT_UNIT_OK; 1212 } 1213 1214 1215 static int 1216 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1217 { 1218 int res; 1219 nxt_unit_impl_t *lib; 1220 nxt_unit_port_id_t port_id; 1221 nxt_unit_request_t *r; 1222 nxt_unit_mmap_buf_t *b; 1223 nxt_unit_request_info_t *req; 1224 nxt_unit_request_info_impl_t *req_impl; 1225 1226 if (nxt_slow_path(recv_msg->mmap == 0)) { 1227 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 1228 recv_msg->stream); 1229 1230 return NXT_UNIT_ERROR; 1231 } 1232 1233 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 1234 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 1235 "%d expected", recv_msg->stream, (int) recv_msg->size, 1236 (int) sizeof(nxt_unit_request_t)); 1237 1238 return NXT_UNIT_ERROR; 1239 } 1240 1241 req_impl = nxt_unit_request_info_get(ctx); 1242 if (nxt_slow_path(req_impl == NULL)) { 1243 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 1244 recv_msg->stream); 1245 1246 return NXT_UNIT_ERROR; 1247 } 1248 1249 req = &req_impl->req; 1250 1251 req->request = recv_msg->start; 1252 1253 b = recv_msg->incoming_buf; 1254 1255 req->request_buf = &b->buf; 1256 req->response = NULL; 1257 req->response_buf = NULL; 1258 1259 r = req->request; 1260 1261 req->content_length = r->content_length; 1262 1263 req->content_buf = req->request_buf; 1264 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 1265 1266 req_impl->stream = recv_msg->stream; 1267 1268 req_impl->outgoing_buf = NULL; 1269 1270 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1271 b->req = req; 1272 } 1273 1274 /* "Move" incoming buffer list to req_impl. */ 1275 req_impl->incoming_buf = recv_msg->incoming_buf; 1276 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 1277 recv_msg->incoming_buf = NULL; 1278 1279 req->content_fd = recv_msg->fd[0]; 1280 recv_msg->fd[0] = -1; 1281 1282 req->response_max_fields = 0; 1283 req_impl->state = NXT_UNIT_RS_START; 1284 req_impl->websocket = 0; 1285 req_impl->in_hash = 0; 1286 1287 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 1288 (int) r->method_length, 1289 (char *) nxt_unit_sptr_get(&r->method), 1290 (int) r->target_length, 1291 (char *) nxt_unit_sptr_get(&r->target), 1292 (int) r->content_length); 1293 1294 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); 1295 1296 res = nxt_unit_request_check_response_port(req, &port_id); 1297 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1298 return NXT_UNIT_ERROR; 1299 } 1300 1301 if (nxt_fast_path(res == NXT_UNIT_OK)) { 1302 res = nxt_unit_send_req_headers_ack(req); 1303 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1304 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1305 1306 return NXT_UNIT_ERROR; 1307 } 1308 1309 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1310 1311 if (req->content_length 1312 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 1313 { 1314 res = nxt_unit_request_hash_add(ctx, req); 1315 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1316 nxt_unit_req_warn(req, "failed to add request to hash"); 1317 1318 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1319 1320 return NXT_UNIT_ERROR; 1321 } 1322 1323 /* 1324 * If application have separate data handler, we may start 1325 * request processing and process data when it is arrived. 1326 */ 1327 if (lib->callbacks.data_handler == NULL) { 1328 return NXT_UNIT_OK; 1329 } 1330 } 1331 1332 lib->callbacks.request_handler(req); 1333 } 1334 1335 return NXT_UNIT_OK; 1336 } 1337 1338 1339 static int 1340 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1341 { 1342 uint64_t l; 1343 nxt_unit_impl_t *lib; 1344 nxt_unit_mmap_buf_t *b; 1345 nxt_unit_request_info_t *req; 1346 1347 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1348 if (req == NULL) { 1349 return NXT_UNIT_OK; 1350 } 1351 1352 l = req->content_buf->end - req->content_buf->free; 1353 1354 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1355 b->req = req; 1356 l += b->buf.end - b->buf.free; 1357 } 1358 1359 if (recv_msg->incoming_buf != NULL) { 1360 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); 1361 1362 while (b->next != NULL) { 1363 b = b->next; 1364 } 1365 1366 /* "Move" incoming buffer list to req_impl. */ 1367 b->next = recv_msg->incoming_buf; 1368 b->next->prev = &b->next; 1369 1370 recv_msg->incoming_buf = NULL; 1371 } 1372 1373 req->content_fd = recv_msg->fd[0]; 1374 recv_msg->fd[0] = -1; 1375 1376 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1377 1378 if (lib->callbacks.data_handler != NULL) { 1379 lib->callbacks.data_handler(req); 1380 1381 return NXT_UNIT_OK; 1382 } 1383 1384 if (req->content_fd != -1 || l == req->content_length) { 1385 lib->callbacks.request_handler(req); 1386 } 1387 1388 return NXT_UNIT_OK; 1389 } 1390 1391 1392 static int 1393 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 1394 nxt_unit_port_id_t *port_id) 1395 { 1396 int res; 1397 nxt_unit_ctx_t *ctx; 1398 nxt_unit_impl_t *lib; 1399 nxt_unit_port_t *port; 1400 nxt_unit_process_t *process; 1401 nxt_unit_ctx_impl_t *ctx_impl; 1402 nxt_unit_port_impl_t *port_impl; 1403 nxt_unit_request_info_impl_t *req_impl; 1404 1405 ctx = req->ctx; 1406 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1407 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1408 1409 pthread_mutex_lock(&lib->mutex); 1410 1411 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 1412 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 1413 1414 if (nxt_fast_path(port != NULL)) { 1415 req->response_port = port; 1416 1417 if (nxt_fast_path(port_impl->ready)) { 1418 pthread_mutex_unlock(&lib->mutex); 1419 1420 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", 1421 (int) port->id.pid, (int) port->id.id); 1422 1423 return NXT_UNIT_OK; 1424 } 1425 1426 nxt_unit_debug(ctx, "check_response_port: " 1427 "port{%d,%d} already requested", 1428 (int) port->id.pid, (int) port->id.id); 1429 1430 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1431 1432 nxt_queue_insert_tail(&port_impl->awaiting_req, 1433 &req_impl->port_wait_link); 1434 1435 pthread_mutex_unlock(&lib->mutex); 1436 1437 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1438 1439 return NXT_UNIT_AGAIN; 1440 } 1441 1442 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 1443 if (nxt_slow_path(port_impl == NULL)) { 1444 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", 1445 (int) sizeof(nxt_unit_port_impl_t)); 1446 1447 pthread_mutex_unlock(&lib->mutex); 1448 1449 return NXT_UNIT_ERROR; 1450 } 1451 1452 port = &port_impl->port; 1453 1454 port->id = *port_id; 1455 port->in_fd = -1; 1456 port->out_fd = -1; 1457 port->data = NULL; 1458 1459 res = nxt_unit_port_hash_add(&lib->ports, port); 1460 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1461 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", 1462 port->id.pid, port->id.id); 1463 1464 pthread_mutex_unlock(&lib->mutex); 1465 1466 nxt_unit_free(ctx, port); 1467 1468 return NXT_UNIT_ERROR; 1469 } 1470 1471 process = nxt_unit_process_find(lib, port_id->pid, 0); 1472 if (nxt_slow_path(process == NULL)) { 1473 nxt_unit_alert(ctx, "check_response_port: process %d not found", 1474 port->id.pid); 1475 1476 nxt_unit_port_hash_find(&lib->ports, port_id, 1); 1477 1478 pthread_mutex_unlock(&lib->mutex); 1479 1480 nxt_unit_free(ctx, port); 1481 1482 return NXT_UNIT_ERROR; 1483 } 1484 1485 nxt_queue_insert_tail(&process->ports, &port_impl->link); 1486 1487 port_impl->process = process; 1488 port_impl->queue = NULL; 1489 port_impl->from_socket = 0; 1490 port_impl->socket_rbuf = NULL; 1491 1492 nxt_queue_init(&port_impl->awaiting_req); 1493 1494 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1495 1496 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); 1497 1498 port_impl->use_count = 2; 1499 port_impl->ready = 0; 1500 1501 req->response_port = port; 1502 1503 pthread_mutex_unlock(&lib->mutex); 1504 1505 res = nxt_unit_get_port(ctx, port_id); 1506 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1507 return NXT_UNIT_ERROR; 1508 } 1509 1510 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1511 1512 return NXT_UNIT_AGAIN; 1513 } 1514 1515 1516 static int 1517 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) 1518 { 1519 ssize_t res; 1520 nxt_port_msg_t msg; 1521 nxt_unit_impl_t *lib; 1522 nxt_unit_ctx_impl_t *ctx_impl; 1523 nxt_unit_request_info_impl_t *req_impl; 1524 1525 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 1526 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1527 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1528 1529 memset(&msg, 0, sizeof(nxt_port_msg_t)); 1530 1531 msg.stream = req_impl->stream; 1532 msg.pid = lib->pid; 1533 msg.reply_port = ctx_impl->read_port->id.id; 1534 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; 1535 1536 res = nxt_unit_port_send(req->ctx, req->response_port, 1537 &msg, sizeof(msg), NULL, 0); 1538 if (nxt_slow_path(res != sizeof(msg))) { 1539 return NXT_UNIT_ERROR; 1540 } 1541 1542 return NXT_UNIT_OK; 1543 } 1544 1545 1546 static int 1547 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1548 { 1549 size_t hsize; 1550 nxt_unit_impl_t *lib; 1551 nxt_unit_mmap_buf_t *b; 1552 nxt_unit_callbacks_t *cb; 1553 nxt_unit_request_info_t *req; 1554 nxt_unit_request_info_impl_t *req_impl; 1555 nxt_unit_websocket_frame_impl_t *ws_impl; 1556 1557 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1558 if (nxt_slow_path(req == NULL)) { 1559 return NXT_UNIT_OK; 1560 } 1561 1562 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1563 1564 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1565 cb = &lib->callbacks; 1566 1567 if (cb->websocket_handler && recv_msg->size >= 2) { 1568 ws_impl = nxt_unit_websocket_frame_get(ctx); 1569 if (nxt_slow_path(ws_impl == NULL)) { 1570 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 1571 req_impl->stream); 1572 1573 return NXT_UNIT_ERROR; 1574 } 1575 1576 ws_impl->ws.req = req; 1577 1578 ws_impl->buf = NULL; 1579 1580 if (recv_msg->mmap) { 1581 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1582 b->req = req; 1583 } 1584 1585 /* "Move" incoming buffer list to ws_impl. */ 1586 ws_impl->buf = recv_msg->incoming_buf; 1587 ws_impl->buf->prev = &ws_impl->buf; 1588 recv_msg->incoming_buf = NULL; 1589 1590 b = ws_impl->buf; 1591 1592 } else { 1593 b = nxt_unit_mmap_buf_get(ctx); 1594 if (nxt_slow_path(b == NULL)) { 1595 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 1596 req_impl->stream); 1597 1598 nxt_unit_websocket_frame_release(&ws_impl->ws); 1599 1600 return NXT_UNIT_ERROR; 1601 } 1602 1603 b->req = req; 1604 b->buf.start = recv_msg->start; 1605 b->buf.free = b->buf.start; 1606 b->buf.end = b->buf.start + recv_msg->size; 1607 1608 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 1609 } 1610 1611 ws_impl->ws.header = (void *) b->buf.start; 1612 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1613 ws_impl->ws.header); 1614 1615 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1616 1617 if (ws_impl->ws.header->mask) { 1618 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1619 1620 } else { 1621 ws_impl->ws.mask = NULL; 1622 } 1623 1624 b->buf.free += hsize; 1625 1626 ws_impl->ws.content_buf = &b->buf; 1627 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1628 1629 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1630 "payload_len=%"PRIu64, 1631 ws_impl->ws.header->opcode, 1632 ws_impl->ws.payload_len); 1633 1634 cb->websocket_handler(&ws_impl->ws); 1635 } 1636 1637 if (recv_msg->last) { 1638 req_impl->websocket = 0; 1639 1640 if (cb->close_handler) { 1641 nxt_unit_req_debug(req, "close_handler"); 1642 1643 cb->close_handler(req); 1644 1645 } else { 1646 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1647 } 1648 } 1649 1650 return NXT_UNIT_OK; 1651 } 1652 1653 1654 static int 1655 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) 1656 { 1657 nxt_unit_impl_t *lib; 1658 nxt_unit_callbacks_t *cb; 1659 1660 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1661 cb = &lib->callbacks; 1662 1663 if (cb->shm_ack_handler != NULL) { 1664 cb->shm_ack_handler(ctx); 1665 } 1666 1667 return NXT_UNIT_OK; 1668 } 1669 1670 1671 static nxt_unit_request_info_impl_t * 1672 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1673 { 1674 nxt_unit_impl_t *lib; 1675 nxt_queue_link_t *lnk; 1676 nxt_unit_ctx_impl_t *ctx_impl; 1677 nxt_unit_request_info_impl_t *req_impl; 1678 1679 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1680 1681 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1682 1683 pthread_mutex_lock(&ctx_impl->mutex); 1684 1685 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1686 pthread_mutex_unlock(&ctx_impl->mutex); 1687 1688 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t) 1689 + lib->request_data_size); 1690 if (nxt_slow_path(req_impl == NULL)) { 1691 return NULL; 1692 } 1693 1694 req_impl->req.unit = ctx->unit; 1695 req_impl->req.ctx = ctx; 1696 1697 pthread_mutex_lock(&ctx_impl->mutex); 1698 1699 } else { 1700 lnk = nxt_queue_first(&ctx_impl->free_req); 1701 nxt_queue_remove(lnk); 1702 1703 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1704 } 1705 1706 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1707 1708 pthread_mutex_unlock(&ctx_impl->mutex); 1709 1710 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1711 1712 return req_impl; 1713 } 1714 1715 1716 static void 1717 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1718 { 1719 nxt_unit_ctx_impl_t *ctx_impl; 1720 nxt_unit_request_info_impl_t *req_impl; 1721 1722 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1723 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1724 1725 req->response = NULL; 1726 req->response_buf = NULL; 1727 1728 if (req_impl->in_hash) { 1729 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); 1730 } 1731 1732 req_impl->websocket = 0; 1733 1734 while (req_impl->outgoing_buf != NULL) { 1735 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1736 } 1737 1738 while (req_impl->incoming_buf != NULL) { 1739 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1740 } 1741 1742 if (req->content_fd != -1) { 1743 nxt_unit_close(req->content_fd); 1744 1745 req->content_fd = -1; 1746 } 1747 1748 if (req->response_port != NULL) { 1749 nxt_unit_port_release(req->response_port); 1750 1751 req->response_port = NULL; 1752 } 1753 1754 req_impl->state = NXT_UNIT_RS_RELEASED; 1755 1756 pthread_mutex_lock(&ctx_impl->mutex); 1757 1758 nxt_queue_remove(&req_impl->link); 1759 1760 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1761 1762 pthread_mutex_unlock(&ctx_impl->mutex); 1763 } 1764 1765 1766 static void 1767 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1768 { 1769 nxt_unit_ctx_impl_t *ctx_impl; 1770 1771 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1772 1773 nxt_queue_remove(&req_impl->link); 1774 1775 if (req_impl != &ctx_impl->req) { 1776 nxt_unit_free(&ctx_impl->ctx, req_impl); 1777 } 1778 } 1779 1780 1781 static nxt_unit_websocket_frame_impl_t * 1782 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1783 { 1784 nxt_queue_link_t *lnk; 1785 nxt_unit_ctx_impl_t *ctx_impl; 1786 nxt_unit_websocket_frame_impl_t *ws_impl; 1787 1788 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1789 1790 pthread_mutex_lock(&ctx_impl->mutex); 1791 1792 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1793 pthread_mutex_unlock(&ctx_impl->mutex); 1794 1795 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t)); 1796 if (nxt_slow_path(ws_impl == NULL)) { 1797 return NULL; 1798 } 1799 1800 } else { 1801 lnk = nxt_queue_first(&ctx_impl->free_ws); 1802 nxt_queue_remove(lnk); 1803 1804 pthread_mutex_unlock(&ctx_impl->mutex); 1805 1806 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1807 } 1808 1809 ws_impl->ctx_impl = ctx_impl; 1810 1811 return ws_impl; 1812 } 1813 1814 1815 static void 1816 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1817 { 1818 nxt_unit_websocket_frame_impl_t *ws_impl; 1819 1820 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1821 1822 while (ws_impl->buf != NULL) { 1823 nxt_unit_mmap_buf_free(ws_impl->buf); 1824 } 1825 1826 ws->req = NULL; 1827 1828 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1829 1830 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1831 1832 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1833 } 1834 1835 1836 static void 1837 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 1838 nxt_unit_websocket_frame_impl_t *ws_impl) 1839 { 1840 nxt_queue_remove(&ws_impl->link); 1841 1842 nxt_unit_free(ctx, ws_impl); 1843 } 1844 1845 1846 uint16_t 1847 nxt_unit_field_hash(const char *name, size_t name_length) 1848 { 1849 u_char ch; 1850 uint32_t hash; 1851 const char *p, *end; 1852 1853 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1854 end = name + name_length; 1855 1856 for (p = name; p < end; p++) { 1857 ch = *p; 1858 hash = (hash << 4) + hash + nxt_lowcase(ch); 1859 } 1860 1861 hash = (hash >> 16) ^ hash; 1862 1863 return hash; 1864 } 1865 1866 1867 void 1868 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1869 { 1870 char *name; 1871 uint32_t i, j; 1872 nxt_unit_field_t *fields, f; 1873 nxt_unit_request_t *r; 1874 1875 static nxt_str_t content_length = nxt_string("content-length"); 1876 static nxt_str_t content_type = nxt_string("content-type"); 1877 static nxt_str_t cookie = nxt_string("cookie"); 1878 1879 nxt_unit_req_debug(req, "group_dup_fields"); 1880 1881 r = req->request; 1882 fields = r->fields; 1883 1884 for (i = 0; i < r->fields_count; i++) { 1885 name = nxt_unit_sptr_get(&fields[i].name); 1886 1887 switch (fields[i].hash) { 1888 case NXT_UNIT_HASH_CONTENT_LENGTH: 1889 if (fields[i].name_length == content_length.length 1890 && nxt_unit_memcasecmp(name, content_length.start, 1891 content_length.length) == 0) 1892 { 1893 r->content_length_field = i; 1894 } 1895 1896 break; 1897 1898 case NXT_UNIT_HASH_CONTENT_TYPE: 1899 if (fields[i].name_length == content_type.length 1900 && nxt_unit_memcasecmp(name, content_type.start, 1901 content_type.length) == 0) 1902 { 1903 r->content_type_field = i; 1904 } 1905 1906 break; 1907 1908 case NXT_UNIT_HASH_COOKIE: 1909 if (fields[i].name_length == cookie.length 1910 && nxt_unit_memcasecmp(name, cookie.start, 1911 cookie.length) == 0) 1912 { 1913 r->cookie_field = i; 1914 } 1915 1916 break; 1917 } 1918 1919 for (j = i + 1; j < r->fields_count; j++) { 1920 if (fields[i].hash != fields[j].hash 1921 || fields[i].name_length != fields[j].name_length 1922 || nxt_unit_memcasecmp(name, 1923 nxt_unit_sptr_get(&fields[j].name), 1924 fields[j].name_length) != 0) 1925 { 1926 continue; 1927 } 1928 1929 f = fields[j]; 1930 f.value.offset += (j - (i + 1)) * sizeof(f); 1931 1932 while (j > i + 1) { 1933 fields[j] = fields[j - 1]; 1934 fields[j].name.offset -= sizeof(f); 1935 fields[j].value.offset -= sizeof(f); 1936 j--; 1937 } 1938 1939 fields[j] = f; 1940 1941 /* Assign the same name pointer for further grouping simplicity. */ 1942 nxt_unit_sptr_set(&fields[j].name, name); 1943 1944 i++; 1945 } 1946 } 1947 } 1948 1949 1950 int 1951 nxt_unit_response_init(nxt_unit_request_info_t *req, 1952 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 1953 { 1954 uint32_t buf_size; 1955 nxt_unit_buf_t *buf; 1956 nxt_unit_request_info_impl_t *req_impl; 1957 1958 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1959 1960 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1961 nxt_unit_req_warn(req, "init: response already sent"); 1962 1963 return NXT_UNIT_ERROR; 1964 } 1965 1966 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 1967 (int) max_fields_count, (int) max_fields_size); 1968 1969 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 1970 nxt_unit_req_debug(req, "duplicate response init"); 1971 } 1972 1973 /* 1974 * Each field name and value 0-terminated by libunit, 1975 * this is the reason of '+ 2' below. 1976 */ 1977 buf_size = sizeof(nxt_unit_response_t) 1978 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 1979 + max_fields_size; 1980 1981 if (nxt_slow_path(req->response_buf != NULL)) { 1982 buf = req->response_buf; 1983 1984 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 1985 goto init_response; 1986 } 1987 1988 nxt_unit_buf_free(buf); 1989 1990 req->response_buf = NULL; 1991 req->response = NULL; 1992 req->response_max_fields = 0; 1993 1994 req_impl->state = NXT_UNIT_RS_START; 1995 } 1996 1997 buf = nxt_unit_response_buf_alloc(req, buf_size); 1998 if (nxt_slow_path(buf == NULL)) { 1999 return NXT_UNIT_ERROR; 2000 } 2001 2002 init_response: 2003 2004 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 2005 2006 req->response_buf = buf; 2007 2008 req->response = (nxt_unit_response_t *) buf->start; 2009 req->response->status = status; 2010 2011 buf->free = buf->start + sizeof(nxt_unit_response_t) 2012 + max_fields_count * sizeof(nxt_unit_field_t); 2013 2014 req->response_max_fields = max_fields_count; 2015 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 2016 2017 return NXT_UNIT_OK; 2018 } 2019 2020 2021 int 2022 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 2023 uint32_t max_fields_count, uint32_t max_fields_size) 2024 { 2025 char *p; 2026 uint32_t i, buf_size; 2027 nxt_unit_buf_t *buf; 2028 nxt_unit_field_t *f, *src; 2029 nxt_unit_response_t *resp; 2030 nxt_unit_request_info_impl_t *req_impl; 2031 2032 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2033 2034 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2035 nxt_unit_req_warn(req, "realloc: response not init"); 2036 2037 return NXT_UNIT_ERROR; 2038 } 2039 2040 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2041 nxt_unit_req_warn(req, "realloc: response already sent"); 2042 2043 return NXT_UNIT_ERROR; 2044 } 2045 2046 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 2047 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 2048 2049 return NXT_UNIT_ERROR; 2050 } 2051 2052 /* 2053 * Each field name and value 0-terminated by libunit, 2054 * this is the reason of '+ 2' below. 2055 */ 2056 buf_size = sizeof(nxt_unit_response_t) 2057 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 2058 + max_fields_size; 2059 2060 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 2061 2062 buf = nxt_unit_response_buf_alloc(req, buf_size); 2063 if (nxt_slow_path(buf == NULL)) { 2064 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 2065 return NXT_UNIT_ERROR; 2066 } 2067 2068 resp = (nxt_unit_response_t *) buf->start; 2069 2070 memset(resp, 0, sizeof(nxt_unit_response_t)); 2071 2072 resp->status = req->response->status; 2073 resp->content_length = req->response->content_length; 2074 2075 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 2076 f = resp->fields; 2077 2078 for (i = 0; i < req->response->fields_count; i++) { 2079 src = req->response->fields + i; 2080 2081 if (nxt_slow_path(src->skip != 0)) { 2082 continue; 2083 } 2084 2085 if (nxt_slow_path(src->name_length + src->value_length + 2 2086 > (uint32_t) (buf->end - p))) 2087 { 2088 nxt_unit_req_warn(req, "realloc: not enough space for field" 2089 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 2090 i, src, src->name_length, src->value_length); 2091 2092 goto fail; 2093 } 2094 2095 nxt_unit_sptr_set(&f->name, p); 2096 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 2097 *p++ = '\0'; 2098 2099 nxt_unit_sptr_set(&f->value, p); 2100 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 2101 *p++ = '\0'; 2102 2103 f->hash = src->hash; 2104 f->skip = 0; 2105 f->name_length = src->name_length; 2106 f->value_length = src->value_length; 2107 2108 resp->fields_count++; 2109 f++; 2110 } 2111 2112 if (req->response->piggyback_content_length > 0) { 2113 if (nxt_slow_path(req->response->piggyback_content_length 2114 > (uint32_t) (buf->end - p))) 2115 { 2116 nxt_unit_req_warn(req, "realloc: not enought space for content" 2117 " #%"PRIu32", %"PRIu32" required", 2118 i, req->response->piggyback_content_length); 2119 2120 goto fail; 2121 } 2122 2123 resp->piggyback_content_length = 2124 req->response->piggyback_content_length; 2125 2126 nxt_unit_sptr_set(&resp->piggyback_content, p); 2127 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 2128 req->response->piggyback_content_length); 2129 } 2130 2131 buf->free = p; 2132 2133 nxt_unit_buf_free(req->response_buf); 2134 2135 req->response = resp; 2136 req->response_buf = buf; 2137 req->response_max_fields = max_fields_count; 2138 2139 return NXT_UNIT_OK; 2140 2141 fail: 2142 2143 nxt_unit_buf_free(buf); 2144 2145 return NXT_UNIT_ERROR; 2146 } 2147 2148 2149 int 2150 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 2151 { 2152 nxt_unit_request_info_impl_t *req_impl; 2153 2154 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2155 2156 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 2157 } 2158 2159 2160 int 2161 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 2162 const char *name, uint8_t name_length, 2163 const char *value, uint32_t value_length) 2164 { 2165 nxt_unit_buf_t *buf; 2166 nxt_unit_field_t *f; 2167 nxt_unit_response_t *resp; 2168 nxt_unit_request_info_impl_t *req_impl; 2169 2170 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2171 2172 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 2173 nxt_unit_req_warn(req, "add_field: response not initialized or " 2174 "already sent"); 2175 2176 return NXT_UNIT_ERROR; 2177 } 2178 2179 resp = req->response; 2180 2181 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 2182 nxt_unit_req_warn(req, "add_field: too many response fields"); 2183 2184 return NXT_UNIT_ERROR; 2185 } 2186 2187 buf = req->response_buf; 2188 2189 if (nxt_slow_path(name_length + value_length + 2 2190 > (uint32_t) (buf->end - buf->free))) 2191 { 2192 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 2193 2194 return NXT_UNIT_ERROR; 2195 } 2196 2197 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 2198 resp->fields_count, 2199 (int) name_length, name, 2200 (int) value_length, value); 2201 2202 f = resp->fields + resp->fields_count; 2203 2204 nxt_unit_sptr_set(&f->name, buf->free); 2205 buf->free = nxt_cpymem(buf->free, name, name_length); 2206 *buf->free++ = '\0'; 2207 2208 nxt_unit_sptr_set(&f->value, buf->free); 2209 buf->free = nxt_cpymem(buf->free, value, value_length); 2210 *buf->free++ = '\0'; 2211 2212 f->hash = nxt_unit_field_hash(name, name_length); 2213 f->skip = 0; 2214 f->name_length = name_length; 2215 f->value_length = value_length; 2216 2217 resp->fields_count++; 2218 2219 return NXT_UNIT_OK; 2220 } 2221 2222 2223 int 2224 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 2225 const void* src, uint32_t size) 2226 { 2227 nxt_unit_buf_t *buf; 2228 nxt_unit_response_t *resp; 2229 nxt_unit_request_info_impl_t *req_impl; 2230 2231 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2232 2233 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2234 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 2235 2236 return NXT_UNIT_ERROR; 2237 } 2238 2239 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2240 nxt_unit_req_warn(req, "add_content: response already sent"); 2241 2242 return NXT_UNIT_ERROR; 2243 } 2244 2245 buf = req->response_buf; 2246 2247 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 2248 nxt_unit_req_warn(req, "add_content: buffer overflow"); 2249 2250 return NXT_UNIT_ERROR; 2251 } 2252 2253 resp = req->response; 2254 2255 if (resp->piggyback_content_length == 0) { 2256 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 2257 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 2258 } 2259 2260 resp->piggyback_content_length += size; 2261 2262 buf->free = nxt_cpymem(buf->free, src, size); 2263 2264 return NXT_UNIT_OK; 2265 } 2266 2267 2268 int 2269 nxt_unit_response_send(nxt_unit_request_info_t *req) 2270 { 2271 int rc; 2272 nxt_unit_mmap_buf_t *mmap_buf; 2273 nxt_unit_request_info_impl_t *req_impl; 2274 2275 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2276 2277 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2278 nxt_unit_req_warn(req, "send: response is not initialized yet"); 2279 2280 return NXT_UNIT_ERROR; 2281 } 2282 2283 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2284 nxt_unit_req_warn(req, "send: response already sent"); 2285 2286 return NXT_UNIT_ERROR; 2287 } 2288 2289 if (req->request->websocket_handshake && req->response->status == 101) { 2290 nxt_unit_response_upgrade(req); 2291 } 2292 2293 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 2294 req->response->fields_count, 2295 (int) (req->response_buf->free 2296 - req->response_buf->start)); 2297 2298 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 2299 2300 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2301 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 2302 req->response = NULL; 2303 req->response_buf = NULL; 2304 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2305 2306 nxt_unit_mmap_buf_free(mmap_buf); 2307 } 2308 2309 return rc; 2310 } 2311 2312 2313 int 2314 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 2315 { 2316 nxt_unit_request_info_impl_t *req_impl; 2317 2318 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2319 2320 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 2321 } 2322 2323 2324 nxt_unit_buf_t * 2325 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 2326 { 2327 int rc; 2328 nxt_unit_mmap_buf_t *mmap_buf; 2329 nxt_unit_request_info_impl_t *req_impl; 2330 2331 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 2332 nxt_unit_req_warn(req, "response_buf_alloc: " 2333 "requested buffer (%"PRIu32") too big", size); 2334 2335 return NULL; 2336 } 2337 2338 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 2339 2340 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2341 2342 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 2343 if (nxt_slow_path(mmap_buf == NULL)) { 2344 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 2345 2346 return NULL; 2347 } 2348 2349 mmap_buf->req = req; 2350 2351 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 2352 2353 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2354 size, size, mmap_buf, 2355 NULL); 2356 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2357 nxt_unit_mmap_buf_release(mmap_buf); 2358 2359 return NULL; 2360 } 2361 2362 return &mmap_buf->buf; 2363 } 2364 2365 2366 static nxt_unit_mmap_buf_t * 2367 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 2368 { 2369 nxt_unit_mmap_buf_t *mmap_buf; 2370 nxt_unit_ctx_impl_t *ctx_impl; 2371 2372 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2373 2374 pthread_mutex_lock(&ctx_impl->mutex); 2375 2376 if (ctx_impl->free_buf == NULL) { 2377 pthread_mutex_unlock(&ctx_impl->mutex); 2378 2379 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t)); 2380 if (nxt_slow_path(mmap_buf == NULL)) { 2381 return NULL; 2382 } 2383 2384 } else { 2385 mmap_buf = ctx_impl->free_buf; 2386 2387 nxt_unit_mmap_buf_unlink(mmap_buf); 2388 2389 pthread_mutex_unlock(&ctx_impl->mutex); 2390 } 2391 2392 mmap_buf->ctx_impl = ctx_impl; 2393 2394 mmap_buf->hdr = NULL; 2395 mmap_buf->free_ptr = NULL; 2396 2397 return mmap_buf; 2398 } 2399 2400 2401 static void 2402 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 2403 { 2404 nxt_unit_mmap_buf_unlink(mmap_buf); 2405 2406 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 2407 2408 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 2409 2410 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 2411 } 2412 2413 2414 int 2415 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 2416 { 2417 return req->request->websocket_handshake; 2418 } 2419 2420 2421 int 2422 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 2423 { 2424 int rc; 2425 nxt_unit_request_info_impl_t *req_impl; 2426 2427 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2428 2429 if (nxt_slow_path(req_impl->websocket != 0)) { 2430 nxt_unit_req_debug(req, "upgrade: already upgraded"); 2431 2432 return NXT_UNIT_OK; 2433 } 2434 2435 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2436 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 2437 2438 return NXT_UNIT_ERROR; 2439 } 2440 2441 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2442 nxt_unit_req_warn(req, "upgrade: response already sent"); 2443 2444 return NXT_UNIT_ERROR; 2445 } 2446 2447 rc = nxt_unit_request_hash_add(req->ctx, req); 2448 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2449 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 2450 2451 return NXT_UNIT_ERROR; 2452 } 2453 2454 req_impl->websocket = 1; 2455 2456 req->response->status = 101; 2457 2458 return NXT_UNIT_OK; 2459 } 2460 2461 2462 int 2463 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 2464 { 2465 nxt_unit_request_info_impl_t *req_impl; 2466 2467 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2468 2469 return req_impl->websocket; 2470 } 2471 2472 2473 nxt_unit_request_info_t * 2474 nxt_unit_get_request_info_from_data(void *data) 2475 { 2476 nxt_unit_request_info_impl_t *req_impl; 2477 2478 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 2479 2480 return &req_impl->req; 2481 } 2482 2483 2484 int 2485 nxt_unit_buf_send(nxt_unit_buf_t *buf) 2486 { 2487 int rc; 2488 nxt_unit_mmap_buf_t *mmap_buf; 2489 nxt_unit_request_info_t *req; 2490 nxt_unit_request_info_impl_t *req_impl; 2491 2492 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2493 2494 req = mmap_buf->req; 2495 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2496 2497 nxt_unit_req_debug(req, "buf_send: %d bytes", 2498 (int) (buf->free - buf->start)); 2499 2500 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2501 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 2502 2503 return NXT_UNIT_ERROR; 2504 } 2505 2506 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2507 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 2508 2509 return NXT_UNIT_ERROR; 2510 } 2511 2512 if (nxt_fast_path(buf->free > buf->start)) { 2513 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2514 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2515 return rc; 2516 } 2517 } 2518 2519 nxt_unit_mmap_buf_free(mmap_buf); 2520 2521 return NXT_UNIT_OK; 2522 } 2523 2524 2525 static void 2526 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 2527 { 2528 int rc; 2529 nxt_unit_mmap_buf_t *mmap_buf; 2530 nxt_unit_request_info_t *req; 2531 2532 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2533 2534 req = mmap_buf->req; 2535 2536 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1); 2537 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 2538 nxt_unit_mmap_buf_free(mmap_buf); 2539 2540 nxt_unit_request_info_release(req); 2541 2542 } else { 2543 nxt_unit_request_done(req, rc); 2544 } 2545 } 2546 2547 2548 static int 2549 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 2550 nxt_unit_mmap_buf_t *mmap_buf, int last) 2551 { 2552 struct { 2553 nxt_port_msg_t msg; 2554 nxt_port_mmap_msg_t mmap_msg; 2555 } m; 2556 2557 int rc; 2558 u_char *last_used, *first_free; 2559 ssize_t res; 2560 nxt_chunk_id_t first_free_chunk; 2561 nxt_unit_buf_t *buf; 2562 nxt_unit_impl_t *lib; 2563 nxt_port_mmap_header_t *hdr; 2564 nxt_unit_request_info_impl_t *req_impl; 2565 2566 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 2567 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2568 2569 buf = &mmap_buf->buf; 2570 hdr = mmap_buf->hdr; 2571 2572 m.mmap_msg.size = buf->free - buf->start; 2573 2574 m.msg.stream = req_impl->stream; 2575 m.msg.pid = lib->pid; 2576 m.msg.reply_port = 0; 2577 m.msg.type = _NXT_PORT_MSG_DATA; 2578 m.msg.last = last != 0; 2579 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 2580 m.msg.nf = 0; 2581 m.msg.mf = 0; 2582 m.msg.tracking = 0; 2583 2584 rc = NXT_UNIT_ERROR; 2585 2586 if (m.msg.mmap) { 2587 m.mmap_msg.mmap_id = hdr->id; 2588 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, 2589 (u_char *) buf->start); 2590 2591 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 2592 req_impl->stream, 2593 (int) m.mmap_msg.mmap_id, 2594 (int) m.mmap_msg.chunk_id, 2595 (int) m.mmap_msg.size); 2596 2597 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), 2598 NULL, 0); 2599 if (nxt_slow_path(res != sizeof(m))) { 2600 goto free_buf; 2601 } 2602 2603 last_used = (u_char *) buf->free - 1; 2604 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 2605 2606 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { 2607 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 2608 2609 buf->start = (char *) first_free; 2610 buf->free = buf->start; 2611 2612 if (buf->end < buf->start) { 2613 buf->end = buf->start; 2614 } 2615 2616 } else { 2617 buf->start = NULL; 2618 buf->free = NULL; 2619 buf->end = NULL; 2620 2621 mmap_buf->hdr = NULL; 2622 } 2623 2624 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, 2625 (int) m.mmap_msg.chunk_id - (int) first_free_chunk); 2626 2627 nxt_unit_debug(req->ctx, "allocated_chunks %d", 2628 (int) lib->outgoing.allocated_chunks); 2629 2630 } else { 2631 if (nxt_slow_path(mmap_buf->plain_ptr == NULL 2632 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) 2633 { 2634 nxt_unit_alert(req->ctx, 2635 "#%"PRIu32": failed to send plain memory buffer" 2636 ": no space reserved for message header", 2637 req_impl->stream); 2638 2639 goto free_buf; 2640 } 2641 2642 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); 2643 2644 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", 2645 req_impl->stream, 2646 (int) (sizeof(m.msg) + m.mmap_msg.size)); 2647 2648 res = nxt_unit_port_send(req->ctx, req->response_port, 2649 buf->start - sizeof(m.msg), 2650 m.mmap_msg.size + sizeof(m.msg), 2651 NULL, 0); 2652 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { 2653 goto free_buf; 2654 } 2655 } 2656 2657 rc = NXT_UNIT_OK; 2658 2659 free_buf: 2660 2661 nxt_unit_free_outgoing_buf(mmap_buf); 2662 2663 return rc; 2664 } 2665 2666 2667 void 2668 nxt_unit_buf_free(nxt_unit_buf_t *buf) 2669 { 2670 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 2671 } 2672 2673 2674 static void 2675 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 2676 { 2677 nxt_unit_free_outgoing_buf(mmap_buf); 2678 2679 nxt_unit_mmap_buf_release(mmap_buf); 2680 } 2681 2682 2683 static void 2684 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) 2685 { 2686 if (mmap_buf->hdr != NULL) { 2687 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, 2688 mmap_buf->hdr, mmap_buf->buf.start, 2689 mmap_buf->buf.end - mmap_buf->buf.start); 2690 2691 mmap_buf->hdr = NULL; 2692 2693 return; 2694 } 2695 2696 if (mmap_buf->free_ptr != NULL) { 2697 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr); 2698 2699 mmap_buf->free_ptr = NULL; 2700 } 2701 } 2702 2703 2704 static nxt_unit_read_buf_t * 2705 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2706 { 2707 nxt_unit_ctx_impl_t *ctx_impl; 2708 nxt_unit_read_buf_t *rbuf; 2709 2710 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2711 2712 pthread_mutex_lock(&ctx_impl->mutex); 2713 2714 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); 2715 2716 pthread_mutex_unlock(&ctx_impl->mutex); 2717 2718 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 2719 2720 return rbuf; 2721 } 2722 2723 2724 static nxt_unit_read_buf_t * 2725 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2726 { 2727 nxt_queue_link_t *link; 2728 nxt_unit_read_buf_t *rbuf; 2729 2730 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { 2731 link = nxt_queue_first(&ctx_impl->free_rbuf); 2732 nxt_queue_remove(link); 2733 2734 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); 2735 2736 return rbuf; 2737 } 2738 2739 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t)); 2740 2741 if (nxt_fast_path(rbuf != NULL)) { 2742 rbuf->ctx_impl = ctx_impl; 2743 } 2744 2745 return rbuf; 2746 } 2747 2748 2749 static void 2750 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2751 nxt_unit_read_buf_t *rbuf) 2752 { 2753 nxt_unit_ctx_impl_t *ctx_impl; 2754 2755 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2756 2757 pthread_mutex_lock(&ctx_impl->mutex); 2758 2759 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); 2760 2761 pthread_mutex_unlock(&ctx_impl->mutex); 2762 } 2763 2764 2765 nxt_unit_buf_t * 2766 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2767 { 2768 nxt_unit_mmap_buf_t *mmap_buf; 2769 2770 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2771 2772 if (mmap_buf->next == NULL) { 2773 return NULL; 2774 } 2775 2776 return &mmap_buf->next->buf; 2777 } 2778 2779 2780 uint32_t 2781 nxt_unit_buf_max(void) 2782 { 2783 return PORT_MMAP_DATA_SIZE; 2784 } 2785 2786 2787 uint32_t 2788 nxt_unit_buf_min(void) 2789 { 2790 return PORT_MMAP_CHUNK_SIZE; 2791 } 2792 2793 2794 int 2795 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2796 size_t size) 2797 { 2798 ssize_t res; 2799 2800 res = nxt_unit_response_write_nb(req, start, size, size); 2801 2802 return res < 0 ? -res : NXT_UNIT_OK; 2803 } 2804 2805 2806 ssize_t 2807 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, 2808 size_t size, size_t min_size) 2809 { 2810 int rc; 2811 ssize_t sent; 2812 uint32_t part_size, min_part_size, buf_size; 2813 const char *part_start; 2814 nxt_unit_mmap_buf_t mmap_buf; 2815 nxt_unit_request_info_impl_t *req_impl; 2816 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2817 2818 nxt_unit_req_debug(req, "write: %d", (int) size); 2819 2820 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2821 2822 part_start = start; 2823 sent = 0; 2824 2825 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2826 nxt_unit_req_alert(req, "write: response not initialized yet"); 2827 2828 return -NXT_UNIT_ERROR; 2829 } 2830 2831 /* Check if response is not send yet. */ 2832 if (nxt_slow_path(req->response_buf != NULL)) { 2833 part_size = req->response_buf->end - req->response_buf->free; 2834 part_size = nxt_min(size, part_size); 2835 2836 rc = nxt_unit_response_add_content(req, part_start, part_size); 2837 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2838 return -rc; 2839 } 2840 2841 rc = nxt_unit_response_send(req); 2842 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2843 return -rc; 2844 } 2845 2846 size -= part_size; 2847 part_start += part_size; 2848 sent += part_size; 2849 2850 min_size -= nxt_min(min_size, part_size); 2851 } 2852 2853 while (size > 0) { 2854 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2855 min_part_size = nxt_min(min_size, part_size); 2856 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); 2857 2858 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, 2859 min_part_size, &mmap_buf, local_buf); 2860 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2861 return -rc; 2862 } 2863 2864 buf_size = mmap_buf.buf.end - mmap_buf.buf.free; 2865 if (nxt_slow_path(buf_size == 0)) { 2866 return sent; 2867 } 2868 part_size = nxt_min(buf_size, part_size); 2869 2870 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2871 part_start, part_size); 2872 2873 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2874 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2875 return -rc; 2876 } 2877 2878 size -= part_size; 2879 part_start += part_size; 2880 sent += part_size; 2881 2882 min_size -= nxt_min(min_size, part_size); 2883 } 2884 2885 return sent; 2886 } 2887 2888 2889 int 2890 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2891 nxt_unit_read_info_t *read_info) 2892 { 2893 int rc; 2894 ssize_t n; 2895 uint32_t buf_size; 2896 nxt_unit_buf_t *buf; 2897 nxt_unit_mmap_buf_t mmap_buf; 2898 nxt_unit_request_info_impl_t *req_impl; 2899 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2900 2901 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2902 2903 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2904 nxt_unit_req_alert(req, "write: response not initialized yet"); 2905 2906 return NXT_UNIT_ERROR; 2907 } 2908 2909 /* Check if response is not send yet. */ 2910 if (nxt_slow_path(req->response_buf != NULL)) { 2911 2912 /* Enable content in headers buf. */ 2913 rc = nxt_unit_response_add_content(req, "", 0); 2914 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2915 nxt_unit_req_error(req, "Failed to add piggyback content"); 2916 2917 return rc; 2918 } 2919 2920 buf = req->response_buf; 2921 2922 while (buf->end - buf->free > 0) { 2923 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2924 if (nxt_slow_path(n < 0)) { 2925 nxt_unit_req_error(req, "Read error"); 2926 2927 return NXT_UNIT_ERROR; 2928 } 2929 2930 /* Manually increase sizes. */ 2931 buf->free += n; 2932 req->response->piggyback_content_length += n; 2933 2934 if (read_info->eof) { 2935 break; 2936 } 2937 } 2938 2939 rc = nxt_unit_response_send(req); 2940 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2941 nxt_unit_req_error(req, "Failed to send headers with content"); 2942 2943 return rc; 2944 } 2945 2946 if (read_info->eof) { 2947 return NXT_UNIT_OK; 2948 } 2949 } 2950 2951 while (!read_info->eof) { 2952 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 2953 read_info->buf_size); 2954 2955 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); 2956 2957 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2958 buf_size, buf_size, 2959 &mmap_buf, local_buf); 2960 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2961 return rc; 2962 } 2963 2964 buf = &mmap_buf.buf; 2965 2966 while (!read_info->eof && buf->end > buf->free) { 2967 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2968 if (nxt_slow_path(n < 0)) { 2969 nxt_unit_req_error(req, "Read error"); 2970 2971 nxt_unit_free_outgoing_buf(&mmap_buf); 2972 2973 return NXT_UNIT_ERROR; 2974 } 2975 2976 buf->free += n; 2977 } 2978 2979 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2980 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2981 nxt_unit_req_error(req, "Failed to send content"); 2982 2983 return rc; 2984 } 2985 } 2986 2987 return NXT_UNIT_OK; 2988 } 2989 2990 2991 ssize_t 2992 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 2993 { 2994 ssize_t buf_res, res; 2995 2996 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, 2997 dst, size); 2998 2999 if (buf_res < (ssize_t) size && req->content_fd != -1) { 3000 res = read(req->content_fd, dst, size); 3001 if (nxt_slow_path(res < 0)) { 3002 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 3003 strerror(errno), errno); 3004 3005 return res; 3006 } 3007 3008 if (res < (ssize_t) size) { 3009 nxt_unit_close(req->content_fd); 3010 3011 req->content_fd = -1; 3012 } 3013 3014 req->content_length -= res; 3015 size -= res; 3016 3017 dst = nxt_pointer_to(dst, res); 3018 3019 } else { 3020 res = 0; 3021 } 3022 3023 return buf_res + res; 3024 } 3025 3026 3027 ssize_t 3028 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size) 3029 { 3030 char *p; 3031 size_t l_size, b_size; 3032 nxt_unit_buf_t *b; 3033 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf; 3034 3035 if (req->content_length == 0) { 3036 return 0; 3037 } 3038 3039 l_size = 0; 3040 3041 b = req->content_buf; 3042 3043 while (b != NULL) { 3044 b_size = b->end - b->free; 3045 p = memchr(b->free, '\n', b_size); 3046 3047 if (p != NULL) { 3048 p++; 3049 l_size += p - b->free; 3050 break; 3051 } 3052 3053 l_size += b_size; 3054 3055 if (max_size <= l_size) { 3056 break; 3057 } 3058 3059 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf); 3060 if (mmap_buf->next == NULL 3061 && req->content_fd != -1 3062 && l_size < req->content_length) 3063 { 3064 preread_buf = nxt_unit_request_preread(req, 16384); 3065 if (nxt_slow_path(preread_buf == NULL)) { 3066 return -1; 3067 } 3068 3069 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf); 3070 } 3071 3072 b = nxt_unit_buf_next(b); 3073 } 3074 3075 return nxt_min(max_size, l_size); 3076 } 3077 3078 3079 static nxt_unit_mmap_buf_t * 3080 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) 3081 { 3082 ssize_t res; 3083 nxt_unit_mmap_buf_t *mmap_buf; 3084 3085 if (req->content_fd == -1) { 3086 nxt_unit_req_alert(req, "preread: content_fd == -1"); 3087 return NULL; 3088 } 3089 3090 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 3091 if (nxt_slow_path(mmap_buf == NULL)) { 3092 nxt_unit_req_alert(req, "preread: failed to allocate buf"); 3093 return NULL; 3094 } 3095 3096 mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size); 3097 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3098 nxt_unit_req_alert(req, "preread: failed to allocate buf memory"); 3099 nxt_unit_mmap_buf_release(mmap_buf); 3100 return NULL; 3101 } 3102 3103 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3104 3105 mmap_buf->hdr = NULL; 3106 mmap_buf->buf.start = mmap_buf->free_ptr; 3107 mmap_buf->buf.free = mmap_buf->buf.start; 3108 mmap_buf->buf.end = mmap_buf->buf.start + size; 3109 3110 res = read(req->content_fd, mmap_buf->free_ptr, size); 3111 if (res < 0) { 3112 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 3113 strerror(errno), errno); 3114 3115 nxt_unit_mmap_buf_free(mmap_buf); 3116 3117 return NULL; 3118 } 3119 3120 if (res < (ssize_t) size) { 3121 nxt_unit_close(req->content_fd); 3122 3123 req->content_fd = -1; 3124 } 3125 3126 nxt_unit_req_debug(req, "preread: read %d", (int) res); 3127 3128 mmap_buf->buf.end = mmap_buf->buf.free + res; 3129 3130 return mmap_buf; 3131 } 3132 3133 3134 static ssize_t 3135 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) 3136 { 3137 u_char *p; 3138 size_t rest, copy, read; 3139 nxt_unit_buf_t *buf, *last_buf; 3140 3141 p = dst; 3142 rest = size; 3143 3144 buf = *b; 3145 last_buf = buf; 3146 3147 while (buf != NULL) { 3148 last_buf = buf; 3149 3150 copy = buf->end - buf->free; 3151 copy = nxt_min(rest, copy); 3152 3153 p = nxt_cpymem(p, buf->free, copy); 3154 3155 buf->free += copy; 3156 rest -= copy; 3157 3158 if (rest == 0) { 3159 if (buf->end == buf->free) { 3160 buf = nxt_unit_buf_next(buf); 3161 } 3162 3163 break; 3164 } 3165 3166 buf = nxt_unit_buf_next(buf); 3167 } 3168 3169 *b = last_buf; 3170 3171 read = size - rest; 3172 3173 *len -= read; 3174 3175 return read; 3176 } 3177 3178 3179 void 3180 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 3181 { 3182 uint32_t size; 3183 nxt_port_msg_t msg; 3184 nxt_unit_impl_t *lib; 3185 nxt_unit_request_info_impl_t *req_impl; 3186 3187 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 3188 3189 nxt_unit_req_debug(req, "done: %d", rc); 3190 3191 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3192 goto skip_response_send; 3193 } 3194 3195 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 3196 3197 size = nxt_length("Content-Type") + nxt_length("text/plain"); 3198 3199 rc = nxt_unit_response_init(req, 200, 1, size); 3200 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3201 goto skip_response_send; 3202 } 3203 3204 rc = nxt_unit_response_add_field(req, "Content-Type", 3205 nxt_length("Content-Type"), 3206 "text/plain", nxt_length("text/plain")); 3207 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3208 goto skip_response_send; 3209 } 3210 } 3211 3212 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 3213 3214 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 3215 3216 nxt_unit_buf_send_done(req->response_buf); 3217 3218 return; 3219 } 3220 3221 skip_response_send: 3222 3223 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 3224 3225 msg.stream = req_impl->stream; 3226 msg.pid = lib->pid; 3227 msg.reply_port = 0; 3228 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 3229 : _NXT_PORT_MSG_RPC_ERROR; 3230 msg.last = 1; 3231 msg.mmap = 0; 3232 msg.nf = 0; 3233 msg.mf = 0; 3234 msg.tracking = 0; 3235 3236 (void) nxt_unit_port_send(req->ctx, req->response_port, 3237 &msg, sizeof(msg), NULL, 0); 3238 3239 nxt_unit_request_info_release(req); 3240 } 3241 3242 3243 int 3244 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 3245 uint8_t last, const void *start, size_t size) 3246 { 3247 const struct iovec iov = { (void *) start, size }; 3248 3249 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); 3250 } 3251 3252 3253 int 3254 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 3255 uint8_t last, const struct iovec *iov, int iovcnt) 3256 { 3257 int i, rc; 3258 size_t l, copy; 3259 uint32_t payload_len, buf_size, alloc_size; 3260 const uint8_t *b; 3261 nxt_unit_buf_t *buf; 3262 nxt_unit_mmap_buf_t mmap_buf; 3263 nxt_websocket_header_t *wh; 3264 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 3265 3266 payload_len = 0; 3267 3268 for (i = 0; i < iovcnt; i++) { 3269 payload_len += iov[i].iov_len; 3270 } 3271 3272 buf_size = 10 + payload_len; 3273 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 3274 3275 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3276 alloc_size, alloc_size, 3277 &mmap_buf, local_buf); 3278 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3279 return rc; 3280 } 3281 3282 buf = &mmap_buf.buf; 3283 3284 buf->start[0] = 0; 3285 buf->start[1] = 0; 3286 3287 buf_size -= buf->end - buf->start; 3288 3289 wh = (void *) buf->free; 3290 3291 buf->free = nxt_websocket_frame_init(wh, payload_len); 3292 wh->fin = last; 3293 wh->opcode = opcode; 3294 3295 for (i = 0; i < iovcnt; i++) { 3296 b = iov[i].iov_base; 3297 l = iov[i].iov_len; 3298 3299 while (l > 0) { 3300 copy = buf->end - buf->free; 3301 copy = nxt_min(l, copy); 3302 3303 buf->free = nxt_cpymem(buf->free, b, copy); 3304 b += copy; 3305 l -= copy; 3306 3307 if (l > 0) { 3308 if (nxt_fast_path(buf->free > buf->start)) { 3309 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3310 3311 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3312 return rc; 3313 } 3314 } 3315 3316 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 3317 3318 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3319 alloc_size, alloc_size, 3320 &mmap_buf, local_buf); 3321 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3322 return rc; 3323 } 3324 3325 buf_size -= buf->end - buf->start; 3326 } 3327 } 3328 } 3329 3330 if (buf->free > buf->start) { 3331 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3332 } 3333 3334 return rc; 3335 } 3336 3337 3338 ssize_t 3339 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 3340 size_t size) 3341 { 3342 ssize_t res; 3343 uint8_t *b; 3344 uint64_t i, d; 3345 3346 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, 3347 dst, size); 3348 3349 if (ws->mask == NULL) { 3350 return res; 3351 } 3352 3353 b = dst; 3354 d = (ws->payload_len - ws->content_length - res) % 4; 3355 3356 for (i = 0; i < (uint64_t) res; i++) { 3357 b[i] ^= ws->mask[ (i + d) % 4 ]; 3358 } 3359 3360 return res; 3361 } 3362 3363 3364 int 3365 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) 3366 { 3367 char *b; 3368 size_t size, hsize; 3369 nxt_unit_websocket_frame_impl_t *ws_impl; 3370 3371 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 3372 3373 if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) { 3374 return NXT_UNIT_OK; 3375 } 3376 3377 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; 3378 3379 b = nxt_unit_malloc(ws->req->ctx, size); 3380 if (nxt_slow_path(b == NULL)) { 3381 return NXT_UNIT_ERROR; 3382 } 3383 3384 memcpy(b, ws_impl->buf->buf.start, size); 3385 3386 hsize = nxt_websocket_frame_header_size(b); 3387 3388 ws_impl->buf->buf.start = b; 3389 ws_impl->buf->buf.free = b + hsize; 3390 ws_impl->buf->buf.end = b + size; 3391 3392 ws_impl->buf->free_ptr = b; 3393 3394 ws_impl->ws.header = (nxt_websocket_header_t *) b; 3395 3396 if (ws_impl->ws.header->mask) { 3397 ws_impl->ws.mask = (uint8_t *) b + hsize - 4; 3398 3399 } else { 3400 ws_impl->ws.mask = NULL; 3401 } 3402 3403 return NXT_UNIT_OK; 3404 } 3405 3406 3407 void 3408 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) 3409 { 3410 nxt_unit_websocket_frame_release(ws); 3411 } 3412 3413 3414 static nxt_port_mmap_header_t * 3415 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 3416 nxt_chunk_id_t *c, int *n, int min_n) 3417 { 3418 int res, nchunks, i; 3419 uint32_t outgoing_size; 3420 nxt_unit_mmap_t *mm, *mm_end; 3421 nxt_unit_impl_t *lib; 3422 nxt_port_mmap_header_t *hdr; 3423 3424 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3425 3426 pthread_mutex_lock(&lib->outgoing.mutex); 3427 3428 retry: 3429 3430 outgoing_size = lib->outgoing.size; 3431 3432 mm_end = lib->outgoing.elts + outgoing_size; 3433 3434 for (mm = lib->outgoing.elts; mm < mm_end; mm++) { 3435 hdr = mm->hdr; 3436 3437 if (hdr->sent_over != 0xFFFFu 3438 && (hdr->sent_over != port->id.id 3439 || mm->src_thread != pthread_self())) 3440 { 3441 continue; 3442 } 3443 3444 *c = 0; 3445 3446 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { 3447 nchunks = 1; 3448 3449 while (nchunks < *n) { 3450 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, 3451 *c + nchunks); 3452 3453 if (res == 0) { 3454 if (nchunks >= min_n) { 3455 *n = nchunks; 3456 3457 goto unlock; 3458 } 3459 3460 for (i = 0; i < nchunks; i++) { 3461 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); 3462 } 3463 3464 *c += nchunks + 1; 3465 nchunks = 0; 3466 break; 3467 } 3468 3469 nchunks++; 3470 } 3471 3472 if (nchunks >= min_n) { 3473 *n = nchunks; 3474 3475 goto unlock; 3476 } 3477 } 3478 3479 hdr->oosm = 1; 3480 } 3481 3482 if (outgoing_size >= lib->shm_mmap_limit) { 3483 /* Cannot allocate more shared memory. */ 3484 pthread_mutex_unlock(&lib->outgoing.mutex); 3485 3486 if (min_n == 0) { 3487 *n = 0; 3488 } 3489 3490 if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n 3491 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) 3492 { 3493 /* Memory allocated by application, but not send to router. */ 3494 return NULL; 3495 } 3496 3497 /* Notify router about OOSM condition. */ 3498 3499 res = nxt_unit_send_oosm(ctx, port); 3500 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3501 return NULL; 3502 } 3503 3504 /* Return if caller can handle OOSM condition. Non-blocking mode. */ 3505 3506 if (min_n == 0) { 3507 return NULL; 3508 } 3509 3510 nxt_unit_debug(ctx, "oosm: waiting for ACK"); 3511 3512 res = nxt_unit_wait_shm_ack(ctx); 3513 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3514 return NULL; 3515 } 3516 3517 nxt_unit_debug(ctx, "oosm: retry"); 3518 3519 pthread_mutex_lock(&lib->outgoing.mutex); 3520 3521 goto retry; 3522 } 3523 3524 *c = 0; 3525 hdr = nxt_unit_new_mmap(ctx, port, *n); 3526 3527 unlock: 3528 3529 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n); 3530 3531 nxt_unit_debug(ctx, "allocated_chunks %d", 3532 (int) lib->outgoing.allocated_chunks); 3533 3534 pthread_mutex_unlock(&lib->outgoing.mutex); 3535 3536 return hdr; 3537 } 3538 3539 3540 static int 3541 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 3542 { 3543 ssize_t res; 3544 nxt_port_msg_t msg; 3545 nxt_unit_impl_t *lib; 3546 3547 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3548 3549 msg.stream = 0; 3550 msg.pid = lib->pid; 3551 msg.reply_port = 0; 3552 msg.type = _NXT_PORT_MSG_OOSM; 3553 msg.last = 0; 3554 msg.mmap = 0; 3555 msg.nf = 0; 3556 msg.mf = 0; 3557 msg.tracking = 0; 3558 3559 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 3560 if (nxt_slow_path(res != sizeof(msg))) { 3561 return NXT_UNIT_ERROR; 3562 } 3563 3564 return NXT_UNIT_OK; 3565 } 3566 3567 3568 static int 3569 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) 3570 { 3571 int res; 3572 nxt_unit_ctx_impl_t *ctx_impl; 3573 nxt_unit_read_buf_t *rbuf; 3574 3575 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3576 3577 while (1) { 3578 rbuf = nxt_unit_read_buf_get(ctx); 3579 if (nxt_slow_path(rbuf == NULL)) { 3580 return NXT_UNIT_ERROR; 3581 } 3582 3583 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 3584 if (res == NXT_UNIT_ERROR) { 3585 nxt_unit_read_buf_release(ctx, rbuf); 3586 3587 return NXT_UNIT_ERROR; 3588 } 3589 3590 if (nxt_unit_is_shm_ack(rbuf)) { 3591 nxt_unit_read_buf_release(ctx, rbuf); 3592 break; 3593 } 3594 3595 pthread_mutex_lock(&ctx_impl->mutex); 3596 3597 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link); 3598 3599 pthread_mutex_unlock(&ctx_impl->mutex); 3600 3601 if (nxt_unit_is_quit(rbuf)) { 3602 nxt_unit_debug(ctx, "oosm: quit received"); 3603 3604 return NXT_UNIT_ERROR; 3605 } 3606 } 3607 3608 return NXT_UNIT_OK; 3609 } 3610 3611 3612 static nxt_unit_mmap_t * 3613 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 3614 { 3615 uint32_t cap, n; 3616 nxt_unit_mmap_t *e; 3617 3618 if (nxt_fast_path(mmaps->size > i)) { 3619 return mmaps->elts + i; 3620 } 3621 3622 cap = mmaps->cap; 3623 3624 if (cap == 0) { 3625 cap = i + 1; 3626 } 3627 3628 while (i + 1 > cap) { 3629 3630 if (cap < 16) { 3631 cap = cap * 2; 3632 3633 } else { 3634 cap = cap + cap / 2; 3635 } 3636 } 3637 3638 if (cap != mmaps->cap) { 3639 3640 e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t)); 3641 if (nxt_slow_path(e == NULL)) { 3642 return NULL; 3643 } 3644 3645 mmaps->elts = e; 3646 3647 for (n = mmaps->cap; n < cap; n++) { 3648 e = mmaps->elts + n; 3649 3650 e->hdr = NULL; 3651 nxt_queue_init(&e->awaiting_rbuf); 3652 } 3653 3654 mmaps->cap = cap; 3655 } 3656 3657 if (i + 1 > mmaps->size) { 3658 mmaps->size = i + 1; 3659 } 3660 3661 return mmaps->elts + i; 3662 } 3663 3664 3665 static nxt_port_mmap_header_t * 3666 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) 3667 { 3668 int i, fd, rc; 3669 void *mem; 3670 nxt_unit_mmap_t *mm; 3671 nxt_unit_impl_t *lib; 3672 nxt_port_mmap_header_t *hdr; 3673 3674 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3675 3676 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size); 3677 if (nxt_slow_path(mm == NULL)) { 3678 nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); 3679 3680 return NULL; 3681 } 3682 3683 fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE); 3684 if (nxt_slow_path(fd == -1)) { 3685 goto remove_fail; 3686 } 3687 3688 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 3689 if (nxt_slow_path(mem == MAP_FAILED)) { 3690 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, 3691 strerror(errno), errno); 3692 3693 nxt_unit_close(fd); 3694 3695 goto remove_fail; 3696 } 3697 3698 mm->hdr = mem; 3699 hdr = mem; 3700 3701 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 3702 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 3703 3704 hdr->id = lib->outgoing.size - 1; 3705 hdr->src_pid = lib->pid; 3706 hdr->dst_pid = port->id.pid; 3707 hdr->sent_over = port->id.id; 3708 mm->src_thread = pthread_self(); 3709 3710 /* Mark first n chunk(s) as busy */ 3711 for (i = 0; i < n; i++) { 3712 nxt_port_mmap_set_chunk_busy(hdr->free_map, i); 3713 } 3714 3715 /* Mark as busy chunk followed the last available chunk. */ 3716 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 3717 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 3718 3719 pthread_mutex_unlock(&lib->outgoing.mutex); 3720 3721 rc = nxt_unit_send_mmap(ctx, port, fd); 3722 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3723 munmap(mem, PORT_MMAP_SIZE); 3724 hdr = NULL; 3725 3726 } else { 3727 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", 3728 hdr->id, (int) lib->pid, (int) port->id.pid); 3729 } 3730 3731 nxt_unit_close(fd); 3732 3733 pthread_mutex_lock(&lib->outgoing.mutex); 3734 3735 if (nxt_fast_path(hdr != NULL)) { 3736 return hdr; 3737 } 3738 3739 remove_fail: 3740 3741 lib->outgoing.size--; 3742 3743 return NULL; 3744 } 3745 3746 3747 static int 3748 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) 3749 { 3750 int fd; 3751 nxt_unit_impl_t *lib; 3752 3753 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3754 3755 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) 3756 char name[64]; 3757 3758 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", 3759 lib->pid, (void *) pthread_self()); 3760 #endif 3761 3762 #if (NXT_HAVE_MEMFD_CREATE) 3763 3764 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 3765 if (nxt_slow_path(fd == -1)) { 3766 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, 3767 strerror(errno), errno); 3768 3769 return -1; 3770 } 3771 3772 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); 3773 3774 #elif (NXT_HAVE_SHM_OPEN_ANON) 3775 3776 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 3777 if (nxt_slow_path(fd == -1)) { 3778 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", 3779 strerror(errno), errno); 3780 3781 return -1; 3782 } 3783 3784 #elif (NXT_HAVE_SHM_OPEN) 3785 3786 /* Just in case. */ 3787 shm_unlink(name); 3788 3789 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 3790 if (nxt_slow_path(fd == -1)) { 3791 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, 3792 strerror(errno), errno); 3793 3794 return -1; 3795 } 3796 3797 if (nxt_slow_path(shm_unlink(name) == -1)) { 3798 nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name, 3799 strerror(errno), errno); 3800 } 3801 3802 #else 3803 3804 #error No working shared memory implementation. 3805 3806 #endif 3807 3808 if (nxt_slow_path(ftruncate(fd, size) == -1)) { 3809 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, 3810 strerror(errno), errno); 3811 3812 nxt_unit_close(fd); 3813 3814 return -1; 3815 } 3816 3817 return fd; 3818 } 3819 3820 3821 static int 3822 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) 3823 { 3824 ssize_t res; 3825 nxt_port_msg_t msg; 3826 nxt_unit_impl_t *lib; 3827 union { 3828 struct cmsghdr cm; 3829 char space[CMSG_SPACE(sizeof(int))]; 3830 } cmsg; 3831 3832 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3833 3834 msg.stream = 0; 3835 msg.pid = lib->pid; 3836 msg.reply_port = 0; 3837 msg.type = _NXT_PORT_MSG_MMAP; 3838 msg.last = 0; 3839 msg.mmap = 0; 3840 msg.nf = 0; 3841 msg.mf = 0; 3842 msg.tracking = 0; 3843 3844 /* 3845 * Fill all padding fields with 0. 3846 * Code in Go 1.11 validate cmsghdr using padding field as part of len. 3847 * See Cmsghdr definition and socketControlMessageHeaderAndData function. 3848 */ 3849 memset(&cmsg, 0, sizeof(cmsg)); 3850 3851 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 3852 cmsg.cm.cmsg_level = SOL_SOCKET; 3853 cmsg.cm.cmsg_type = SCM_RIGHTS; 3854 3855 /* 3856 * memcpy() is used instead of simple 3857 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 3858 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 3859 * dereferencing type-punned pointer will break strict-aliasing rules 3860 * 3861 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3862 * in the same simple assignment as in the code above. 3863 */ 3864 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3865 3866 res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), 3867 &cmsg, sizeof(cmsg)); 3868 if (nxt_slow_path(res != sizeof(msg))) { 3869 return NXT_UNIT_ERROR; 3870 } 3871 3872 return NXT_UNIT_OK; 3873 } 3874 3875 3876 static int 3877 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 3878 uint32_t size, uint32_t min_size, 3879 nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) 3880 { 3881 int nchunks, min_nchunks; 3882 nxt_chunk_id_t c; 3883 nxt_port_mmap_header_t *hdr; 3884 3885 if (size <= NXT_UNIT_MAX_PLAIN_SIZE) { 3886 if (local_buf != NULL) { 3887 mmap_buf->free_ptr = NULL; 3888 mmap_buf->plain_ptr = local_buf; 3889 3890 } else { 3891 mmap_buf->free_ptr = nxt_unit_malloc(ctx, 3892 size + sizeof(nxt_port_msg_t)); 3893 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3894 return NXT_UNIT_ERROR; 3895 } 3896 3897 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3898 } 3899 3900 mmap_buf->hdr = NULL; 3901 mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); 3902 mmap_buf->buf.free = mmap_buf->buf.start; 3903 mmap_buf->buf.end = mmap_buf->buf.start + size; 3904 3905 nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", 3906 mmap_buf->buf.start, (int) size); 3907 3908 return NXT_UNIT_OK; 3909 } 3910 3911 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3912 min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3913 3914 hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks); 3915 if (nxt_slow_path(hdr == NULL)) { 3916 if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { 3917 mmap_buf->hdr = NULL; 3918 mmap_buf->buf.start = NULL; 3919 mmap_buf->buf.free = NULL; 3920 mmap_buf->buf.end = NULL; 3921 mmap_buf->free_ptr = NULL; 3922 3923 return NXT_UNIT_OK; 3924 } 3925 3926 return NXT_UNIT_ERROR; 3927 } 3928 3929 mmap_buf->hdr = hdr; 3930 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); 3931 mmap_buf->buf.free = mmap_buf->buf.start; 3932 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; 3933 mmap_buf->free_ptr = NULL; 3934 mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3935 3936 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", 3937 (int) hdr->id, (int) c, 3938 (int) (nchunks * PORT_MMAP_CHUNK_SIZE)); 3939 3940 return NXT_UNIT_OK; 3941 } 3942 3943 3944 static int 3945 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 3946 { 3947 int rc; 3948 void *mem; 3949 nxt_queue_t awaiting_rbuf; 3950 struct stat mmap_stat; 3951 nxt_unit_mmap_t *mm; 3952 nxt_unit_impl_t *lib; 3953 nxt_unit_ctx_impl_t *ctx_impl; 3954 nxt_unit_read_buf_t *rbuf; 3955 nxt_port_mmap_header_t *hdr; 3956 3957 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3958 3959 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 3960 3961 if (fstat(fd, &mmap_stat) == -1) { 3962 nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, 3963 strerror(errno), errno); 3964 3965 return NXT_UNIT_ERROR; 3966 } 3967 3968 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, 3969 MAP_SHARED, fd, 0); 3970 if (nxt_slow_path(mem == MAP_FAILED)) { 3971 nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)", 3972 strerror(errno), errno); 3973 3974 return NXT_UNIT_ERROR; 3975 } 3976 3977 hdr = mem; 3978 3979 if (nxt_slow_path(hdr->src_pid != pid)) { 3980 3981 nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header " 3982 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 3983 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 3984 3985 munmap(mem, PORT_MMAP_SIZE); 3986 3987 return NXT_UNIT_ERROR; 3988 } 3989 3990 nxt_queue_init(&awaiting_rbuf); 3991 3992 pthread_mutex_lock(&lib->incoming.mutex); 3993 3994 mm = nxt_unit_mmap_at(&lib->incoming, hdr->id); 3995 if (nxt_slow_path(mm == NULL)) { 3996 nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array"); 3997 3998 munmap(mem, PORT_MMAP_SIZE); 3999 4000 rc = NXT_UNIT_ERROR; 4001 4002 } else { 4003 mm->hdr = hdr; 4004 4005 hdr->sent_over = 0xFFFFu; 4006 4007 nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf); 4008 nxt_queue_init(&mm->awaiting_rbuf); 4009 4010 rc = NXT_UNIT_OK; 4011 } 4012 4013 pthread_mutex_unlock(&lib->incoming.mutex); 4014 4015 nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { 4016 4017 ctx_impl = rbuf->ctx_impl; 4018 4019 pthread_mutex_lock(&ctx_impl->mutex); 4020 4021 nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link); 4022 4023 pthread_mutex_unlock(&ctx_impl->mutex); 4024 4025 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); 4026 4027 nxt_unit_awake_ctx(ctx, ctx_impl); 4028 4029 } nxt_queue_loop; 4030 4031 return rc; 4032 } 4033 4034 4035 static void 4036 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl) 4037 { 4038 nxt_port_msg_t msg; 4039 4040 if (nxt_fast_path(ctx == &ctx_impl->ctx)) { 4041 return; 4042 } 4043 4044 if (nxt_slow_path(ctx_impl->read_port == NULL 4045 || ctx_impl->read_port->out_fd == -1)) 4046 { 4047 nxt_unit_alert(ctx, "target context read_port is NULL or not writable"); 4048 4049 return; 4050 } 4051 4052 memset(&msg, 0, sizeof(nxt_port_msg_t)); 4053 4054 msg.type = _NXT_PORT_MSG_RPC_READY; 4055 4056 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, 4057 &msg, sizeof(msg), NULL, 0); 4058 } 4059 4060 4061 static void 4062 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) 4063 { 4064 pthread_mutex_init(&mmaps->mutex, NULL); 4065 4066 mmaps->size = 0; 4067 mmaps->cap = 0; 4068 mmaps->elts = NULL; 4069 mmaps->allocated_chunks = 0; 4070 } 4071 4072 4073 nxt_inline void 4074 nxt_unit_process_use(nxt_unit_process_t *process) 4075 { 4076 nxt_atomic_fetch_add(&process->use_count, 1); 4077 } 4078 4079 4080 nxt_inline void 4081 nxt_unit_process_release(nxt_unit_process_t *process) 4082 { 4083 long c; 4084 4085 c = nxt_atomic_fetch_add(&process->use_count, -1); 4086 4087 if (c == 1) { 4088 nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); 4089 4090 nxt_unit_free(NULL, process); 4091 } 4092 } 4093 4094 4095 static void 4096 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) 4097 { 4098 nxt_unit_mmap_t *mm, *end; 4099 4100 if (mmaps->elts != NULL) { 4101 end = mmaps->elts + mmaps->size; 4102 4103 for (mm = mmaps->elts; mm < end; mm++) { 4104 munmap(mm->hdr, PORT_MMAP_SIZE); 4105 } 4106 4107 nxt_unit_free(NULL, mmaps->elts); 4108 } 4109 4110 pthread_mutex_destroy(&mmaps->mutex); 4111 } 4112 4113 4114 static int 4115 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, 4116 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, 4117 nxt_unit_read_buf_t *rbuf) 4118 { 4119 int res, need_rbuf; 4120 nxt_unit_mmap_t *mm; 4121 nxt_unit_ctx_impl_t *ctx_impl; 4122 4123 mm = nxt_unit_mmap_at(mmaps, id); 4124 if (nxt_slow_path(mm == NULL)) { 4125 nxt_unit_alert(ctx, "failed to allocate mmap"); 4126 4127 pthread_mutex_unlock(&mmaps->mutex); 4128 4129 *hdr = NULL; 4130 4131 return NXT_UNIT_ERROR; 4132 } 4133 4134 *hdr = mm->hdr; 4135 4136 if (nxt_fast_path(*hdr != NULL)) { 4137 return NXT_UNIT_OK; 4138 } 4139 4140 need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf); 4141 4142 nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link); 4143 4144 pthread_mutex_unlock(&mmaps->mutex); 4145 4146 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4147 4148 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 4149 4150 if (need_rbuf) { 4151 res = nxt_unit_get_mmap(ctx, pid, id); 4152 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 4153 return NXT_UNIT_ERROR; 4154 } 4155 } 4156 4157 return NXT_UNIT_AGAIN; 4158 } 4159 4160 4161 static int 4162 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 4163 nxt_unit_read_buf_t *rbuf) 4164 { 4165 int res; 4166 void *start; 4167 uint32_t size; 4168 nxt_unit_impl_t *lib; 4169 nxt_unit_mmaps_t *mmaps; 4170 nxt_unit_mmap_buf_t *b, **incoming_tail; 4171 nxt_port_mmap_msg_t *mmap_msg, *end; 4172 nxt_port_mmap_header_t *hdr; 4173 4174 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 4175 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 4176 recv_msg->stream, (int) recv_msg->size); 4177 4178 return NXT_UNIT_ERROR; 4179 } 4180 4181 mmap_msg = recv_msg->start; 4182 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 4183 4184 incoming_tail = &recv_msg->incoming_buf; 4185 4186 /* Allocating buffer structures. */ 4187 for (; mmap_msg < end; mmap_msg++) { 4188 b = nxt_unit_mmap_buf_get(ctx); 4189 if (nxt_slow_path(b == NULL)) { 4190 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 4191 recv_msg->stream); 4192 4193 while (recv_msg->incoming_buf != NULL) { 4194 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 4195 } 4196 4197 return NXT_UNIT_ERROR; 4198 } 4199 4200 nxt_unit_mmap_buf_insert(incoming_tail, b); 4201 incoming_tail = &b->next; 4202 } 4203 4204 b = recv_msg->incoming_buf; 4205 mmap_msg = recv_msg->start; 4206 4207 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4208 4209 mmaps = &lib->incoming; 4210 4211 pthread_mutex_lock(&mmaps->mutex); 4212 4213 for (; mmap_msg < end; mmap_msg++) { 4214 res = nxt_unit_check_rbuf_mmap(ctx, mmaps, 4215 recv_msg->pid, mmap_msg->mmap_id, 4216 &hdr, rbuf); 4217 4218 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4219 while (recv_msg->incoming_buf != NULL) { 4220 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 4221 } 4222 4223 return res; 4224 } 4225 4226 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 4227 size = mmap_msg->size; 4228 4229 if (recv_msg->start == mmap_msg) { 4230 recv_msg->start = start; 4231 recv_msg->size = size; 4232 } 4233 4234 b->buf.start = start; 4235 b->buf.free = start; 4236 b->buf.end = b->buf.start + size; 4237 b->hdr = hdr; 4238 4239 b = b->next; 4240 4241 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 4242 recv_msg->stream, 4243 start, (int) size, 4244 (int) hdr->src_pid, (int) hdr->dst_pid, 4245 (int) hdr->id, (int) mmap_msg->chunk_id, 4246 (int) mmap_msg->size); 4247 } 4248 4249 pthread_mutex_unlock(&mmaps->mutex); 4250 4251 return NXT_UNIT_OK; 4252 } 4253 4254 4255 static int 4256 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) 4257 { 4258 ssize_t res; 4259 nxt_unit_impl_t *lib; 4260 nxt_unit_ctx_impl_t *ctx_impl; 4261 4262 struct { 4263 nxt_port_msg_t msg; 4264 nxt_port_msg_get_mmap_t get_mmap; 4265 } m; 4266 4267 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4268 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4269 4270 memset(&m.msg, 0, sizeof(nxt_port_msg_t)); 4271 4272 m.msg.pid = lib->pid; 4273 m.msg.reply_port = ctx_impl->read_port->id.id; 4274 m.msg.type = _NXT_PORT_MSG_GET_MMAP; 4275 4276 m.get_mmap.id = id; 4277 4278 nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); 4279 4280 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); 4281 if (nxt_slow_path(res != sizeof(m))) { 4282 return NXT_UNIT_ERROR; 4283 } 4284 4285 return NXT_UNIT_OK; 4286 } 4287 4288 4289 static void 4290 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, 4291 void *start, uint32_t size) 4292 { 4293 int freed_chunks; 4294 u_char *p, *end; 4295 nxt_chunk_id_t c; 4296 nxt_unit_impl_t *lib; 4297 4298 memset(start, 0xA5, size); 4299 4300 p = start; 4301 end = p + size; 4302 c = nxt_port_mmap_chunk_id(hdr, p); 4303 freed_chunks = 0; 4304 4305 while (p < end) { 4306 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 4307 4308 p += PORT_MMAP_CHUNK_SIZE; 4309 c++; 4310 freed_chunks++; 4311 } 4312 4313 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4314 4315 if (hdr->src_pid == lib->pid && freed_chunks != 0) { 4316 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks); 4317 4318 nxt_unit_debug(ctx, "allocated_chunks %d", 4319 (int) lib->outgoing.allocated_chunks); 4320 } 4321 4322 if (hdr->dst_pid == lib->pid 4323 && freed_chunks != 0 4324 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) 4325 { 4326 nxt_unit_send_shm_ack(ctx, hdr->src_pid); 4327 } 4328 } 4329 4330 4331 static int 4332 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) 4333 { 4334 ssize_t res; 4335 nxt_port_msg_t msg; 4336 nxt_unit_impl_t *lib; 4337 4338 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4339 4340 msg.stream = 0; 4341 msg.pid = lib->pid; 4342 msg.reply_port = 0; 4343 msg.type = _NXT_PORT_MSG_SHM_ACK; 4344 msg.last = 0; 4345 msg.mmap = 0; 4346 msg.nf = 0; 4347 msg.mf = 0; 4348 msg.tracking = 0; 4349 4350 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 4351 if (nxt_slow_path(res != sizeof(msg))) { 4352 return NXT_UNIT_ERROR; 4353 } 4354 4355 return NXT_UNIT_OK; 4356 } 4357 4358 4359 static nxt_int_t 4360 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 4361 { 4362 nxt_process_t *process; 4363 4364 process = data; 4365 4366 if (lhq->key.length == sizeof(pid_t) 4367 && *(pid_t *) lhq->key.start == process->pid) 4368 { 4369 return NXT_OK; 4370 } 4371 4372 return NXT_DECLINED; 4373 } 4374 4375 4376 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 4377 NXT_LVLHSH_DEFAULT, 4378 nxt_unit_lvlhsh_pid_test, 4379 nxt_unit_lvlhsh_alloc, 4380 nxt_unit_lvlhsh_free, 4381 }; 4382 4383 4384 static inline void 4385 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) 4386 { 4387 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 4388 lhq->key.length = sizeof(*pid); 4389 lhq->key.start = (u_char *) pid; 4390 lhq->proto = &lvlhsh_processes_proto; 4391 } 4392 4393 4394 static nxt_unit_process_t * 4395 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) 4396 { 4397 nxt_unit_impl_t *lib; 4398 nxt_unit_process_t *process; 4399 nxt_lvlhsh_query_t lhq; 4400 4401 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4402 4403 nxt_unit_process_lhq_pid(&lhq, &pid); 4404 4405 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { 4406 process = lhq.value; 4407 nxt_unit_process_use(process); 4408 4409 return process; 4410 } 4411 4412 process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t)); 4413 if (nxt_slow_path(process == NULL)) { 4414 nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid); 4415 4416 return NULL; 4417 } 4418 4419 process->pid = pid; 4420 process->use_count = 2; 4421 process->next_port_id = 0; 4422 process->lib = lib; 4423 4424 nxt_queue_init(&process->ports); 4425 4426 lhq.replace = 0; 4427 lhq.value = process; 4428 4429 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { 4430 4431 case NXT_OK: 4432 break; 4433 4434 default: 4435 nxt_unit_alert(ctx, "process %d insert failed", (int) pid); 4436 4437 nxt_unit_free(ctx, process); 4438 process = NULL; 4439 break; 4440 } 4441 4442 return process; 4443 } 4444 4445 4446 static nxt_unit_process_t * 4447 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) 4448 { 4449 int rc; 4450 nxt_lvlhsh_query_t lhq; 4451 4452 nxt_unit_process_lhq_pid(&lhq, &pid); 4453 4454 if (remove) { 4455 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); 4456 4457 } else { 4458 rc = nxt_lvlhsh_find(&lib->processes, &lhq); 4459 } 4460 4461 if (rc == NXT_OK) { 4462 if (!remove) { 4463 nxt_unit_process_use(lhq.value); 4464 } 4465 4466 return lhq.value; 4467 } 4468 4469 return NULL; 4470 } 4471 4472 4473 static nxt_unit_process_t * 4474 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) 4475 { 4476 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); 4477 } 4478 4479 4480 int 4481 nxt_unit_run(nxt_unit_ctx_t *ctx) 4482 { 4483 int rc; 4484 nxt_unit_ctx_impl_t *ctx_impl; 4485 4486 nxt_unit_ctx_use(ctx); 4487 4488 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4489 4490 rc = NXT_UNIT_OK; 4491 4492 while (nxt_fast_path(ctx_impl->online)) { 4493 rc = nxt_unit_run_once_impl(ctx); 4494 4495 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4496 nxt_unit_quit(ctx); 4497 break; 4498 } 4499 } 4500 4501 nxt_unit_ctx_release(ctx); 4502 4503 return rc; 4504 } 4505 4506 4507 int 4508 nxt_unit_run_once(nxt_unit_ctx_t *ctx) 4509 { 4510 int rc; 4511 4512 nxt_unit_ctx_use(ctx); 4513 4514 rc = nxt_unit_run_once_impl(ctx); 4515 4516 nxt_unit_ctx_release(ctx); 4517 4518 return rc; 4519 } 4520 4521 4522 static int 4523 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) 4524 { 4525 int rc; 4526 nxt_unit_read_buf_t *rbuf; 4527 4528 rbuf = nxt_unit_read_buf_get(ctx); 4529 if (nxt_slow_path(rbuf == NULL)) { 4530 return NXT_UNIT_ERROR; 4531 } 4532 4533 rc = nxt_unit_read_buf(ctx, rbuf); 4534 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4535 nxt_unit_read_buf_release(ctx, rbuf); 4536 4537 return rc; 4538 } 4539 4540 rc = nxt_unit_process_msg(ctx, rbuf); 4541 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4542 return NXT_UNIT_ERROR; 4543 } 4544 4545 rc = nxt_unit_process_pending_rbuf(ctx); 4546 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4547 return NXT_UNIT_ERROR; 4548 } 4549 4550 nxt_unit_process_ready_req(ctx); 4551 4552 return rc; 4553 } 4554 4555 4556 static int 4557 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) 4558 { 4559 int nevents, res, err; 4560 nxt_unit_impl_t *lib; 4561 nxt_unit_ctx_impl_t *ctx_impl; 4562 nxt_unit_port_impl_t *port_impl; 4563 struct pollfd fds[2]; 4564 4565 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4566 4567 if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) { 4568 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4569 } 4570 4571 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, 4572 port); 4573 4574 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4575 4576 retry: 4577 4578 if (port_impl->from_socket == 0) { 4579 res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf); 4580 if (res == NXT_UNIT_OK) { 4581 if (nxt_unit_is_read_socket(rbuf)) { 4582 port_impl->from_socket++; 4583 4584 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", 4585 (int) ctx_impl->read_port->id.pid, 4586 (int) ctx_impl->read_port->id.id, 4587 port_impl->from_socket); 4588 4589 } else { 4590 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", 4591 (int) ctx_impl->read_port->id.pid, 4592 (int) ctx_impl->read_port->id.id, 4593 (int) rbuf->size); 4594 4595 return NXT_UNIT_OK; 4596 } 4597 } 4598 } 4599 4600 res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); 4601 if (res == NXT_UNIT_OK) { 4602 return NXT_UNIT_OK; 4603 } 4604 4605 fds[0].fd = ctx_impl->read_port->in_fd; 4606 fds[0].events = POLLIN; 4607 fds[0].revents = 0; 4608 4609 fds[1].fd = lib->shared_port->in_fd; 4610 fds[1].events = POLLIN; 4611 fds[1].revents = 0; 4612 4613 nevents = poll(fds, 2, -1); 4614 if (nxt_slow_path(nevents == -1)) { 4615 err = errno; 4616 4617 if (err == EINTR) { 4618 goto retry; 4619 } 4620 4621 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)", 4622 fds[0].fd, fds[1].fd, strerror(err), err); 4623 4624 rbuf->size = -1; 4625 4626 return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; 4627 } 4628 4629 nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", 4630 fds[0].fd, fds[1].fd, nevents, fds[0].revents, 4631 fds[1].revents); 4632 4633 if ((fds[0].revents & POLLIN) != 0) { 4634 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4635 if (res == NXT_UNIT_AGAIN) { 4636 goto retry; 4637 } 4638 4639 return res; 4640 } 4641 4642 if ((fds[1].revents & POLLIN) != 0) { 4643 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); 4644 if (res == NXT_UNIT_AGAIN) { 4645 goto retry; 4646 } 4647 4648 return res; 4649 } 4650 4651 nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]", 4652 fds[0].fd, fds[1].fd, nevents, fds[0].revents, 4653 fds[1].revents); 4654 4655 return NXT_UNIT_ERROR; 4656 } 4657 4658 4659 static int 4660 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) 4661 { 4662 int rc; 4663 nxt_queue_t pending_rbuf; 4664 nxt_unit_ctx_impl_t *ctx_impl; 4665 nxt_unit_read_buf_t *rbuf; 4666 4667 nxt_queue_init(&pending_rbuf); 4668 4669 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4670 4671 pthread_mutex_lock(&ctx_impl->mutex); 4672 4673 if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { 4674 pthread_mutex_unlock(&ctx_impl->mutex); 4675 4676 return NXT_UNIT_OK; 4677 } 4678 4679 nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf); 4680 nxt_queue_init(&ctx_impl->pending_rbuf); 4681 4682 pthread_mutex_unlock(&ctx_impl->mutex); 4683 4684 rc = NXT_UNIT_OK; 4685 4686 nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { 4687 4688 if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { 4689 rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf); 4690 4691 } else { 4692 nxt_unit_read_buf_release(ctx, rbuf); 4693 } 4694 4695 } nxt_queue_loop; 4696 4697 return rc; 4698 } 4699 4700 4701 static void 4702 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) 4703 { 4704 int res; 4705 nxt_queue_t ready_req; 4706 nxt_unit_impl_t *lib; 4707 nxt_unit_ctx_impl_t *ctx_impl; 4708 nxt_unit_request_info_t *req; 4709 nxt_unit_request_info_impl_t *req_impl; 4710 4711 nxt_queue_init(&ready_req); 4712 4713 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4714 4715 pthread_mutex_lock(&ctx_impl->mutex); 4716 4717 if (nxt_queue_is_empty(&ctx_impl->ready_req)) { 4718 pthread_mutex_unlock(&ctx_impl->mutex); 4719 4720 return; 4721 } 4722 4723 nxt_queue_add(&ready_req, &ctx_impl->ready_req); 4724 nxt_queue_init(&ctx_impl->ready_req); 4725 4726 pthread_mutex_unlock(&ctx_impl->mutex); 4727 4728 nxt_queue_each(req_impl, &ready_req, 4729 nxt_unit_request_info_impl_t, port_wait_link) 4730 { 4731 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 4732 4733 req = &req_impl->req; 4734 4735 res = nxt_unit_send_req_headers_ack(req); 4736 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4737 nxt_unit_request_done(req, NXT_UNIT_ERROR); 4738 4739 continue; 4740 } 4741 4742 if (req->content_length 4743 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 4744 { 4745 res = nxt_unit_request_hash_add(ctx, req); 4746 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4747 nxt_unit_req_warn(req, "failed to add request to hash"); 4748 4749 nxt_unit_request_done(req, NXT_UNIT_ERROR); 4750 4751 continue; 4752 } 4753 4754 /* 4755 * If application have separate data handler, we may start 4756 * request processing and process data when it is arrived. 4757 */ 4758 if (lib->callbacks.data_handler == NULL) { 4759 continue; 4760 } 4761 } 4762 4763 lib->callbacks.request_handler(&req_impl->req); 4764 4765 } nxt_queue_loop; 4766 } 4767 4768 4769 int 4770 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) 4771 { 4772 int rc; 4773 nxt_unit_read_buf_t *rbuf; 4774 nxt_unit_ctx_impl_t *ctx_impl; 4775 4776 nxt_unit_ctx_use(ctx); 4777 4778 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4779 4780 rc = NXT_UNIT_OK; 4781 4782 while (nxt_fast_path(ctx_impl->online)) { 4783 rbuf = nxt_unit_read_buf_get(ctx); 4784 if (nxt_slow_path(rbuf == NULL)) { 4785 rc = NXT_UNIT_ERROR; 4786 break; 4787 } 4788 4789 retry: 4790 4791 rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4792 if (rc == NXT_UNIT_AGAIN) { 4793 goto retry; 4794 } 4795 4796 rc = nxt_unit_process_msg(ctx, rbuf); 4797 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4798 break; 4799 } 4800 4801 rc = nxt_unit_process_pending_rbuf(ctx); 4802 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4803 break; 4804 } 4805 4806 nxt_unit_process_ready_req(ctx); 4807 } 4808 4809 nxt_unit_ctx_release(ctx); 4810 4811 return rc; 4812 } 4813 4814 4815 nxt_inline int 4816 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf) 4817 { 4818 nxt_port_msg_t *port_msg; 4819 4820 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4821 port_msg = (nxt_port_msg_t *) rbuf->buf; 4822 4823 return port_msg->type == _NXT_PORT_MSG_READ_QUEUE; 4824 } 4825 4826 return 0; 4827 } 4828 4829 4830 nxt_inline int 4831 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf) 4832 { 4833 if (nxt_fast_path(rbuf->size == 1)) { 4834 return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET; 4835 } 4836 4837 return 0; 4838 } 4839 4840 4841 nxt_inline int 4842 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf) 4843 { 4844 nxt_port_msg_t *port_msg; 4845 4846 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4847 port_msg = (nxt_port_msg_t *) rbuf->buf; 4848 4849 return port_msg->type == _NXT_PORT_MSG_SHM_ACK; 4850 } 4851 4852 return 0; 4853 } 4854 4855 4856 nxt_inline int 4857 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf) 4858 { 4859 nxt_port_msg_t *port_msg; 4860 4861 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4862 port_msg = (nxt_port_msg_t *) rbuf->buf; 4863 4864 return port_msg->type == _NXT_PORT_MSG_QUIT; 4865 } 4866 4867 return 0; 4868 } 4869 4870 4871 int 4872 nxt_unit_run_shared(nxt_unit_ctx_t *ctx) 4873 { 4874 int rc; 4875 nxt_unit_impl_t *lib; 4876 nxt_unit_read_buf_t *rbuf; 4877 nxt_unit_ctx_impl_t *ctx_impl; 4878 4879 nxt_unit_ctx_use(ctx); 4880 4881 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4882 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4883 4884 rc = NXT_UNIT_OK; 4885 4886 while (nxt_fast_path(ctx_impl->online)) { 4887 rbuf = nxt_unit_read_buf_get(ctx); 4888 if (nxt_slow_path(rbuf == NULL)) { 4889 rc = NXT_UNIT_ERROR; 4890 break; 4891 } 4892 4893 retry: 4894 4895 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); 4896 if (rc == NXT_UNIT_AGAIN) { 4897 goto retry; 4898 } 4899 4900 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4901 nxt_unit_read_buf_release(ctx, rbuf); 4902 break; 4903 } 4904 4905 rc = nxt_unit_process_msg(ctx, rbuf); 4906 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4907 break; 4908 } 4909 4910 rc = nxt_unit_process_pending_rbuf(ctx); 4911 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4912 break; 4913 } 4914 4915 nxt_unit_process_ready_req(ctx); 4916 } 4917 4918 nxt_unit_ctx_release(ctx); 4919 4920 return rc; 4921 } 4922 4923 4924 int 4925 nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) 4926 { 4927 nxt_unit_impl_t *lib; 4928 4929 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4930 4931 return (ctx == &lib->main_ctx.ctx); 4932 } 4933 4934 4935 int 4936 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 4937 { 4938 int rc; 4939 4940 nxt_unit_ctx_use(ctx); 4941 4942 rc = nxt_unit_process_port_msg_impl(ctx, port); 4943 4944 nxt_unit_ctx_release(ctx); 4945 4946 return rc; 4947 } 4948 4949 4950 static int 4951 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 4952 { 4953 int rc; 4954 nxt_unit_impl_t *lib; 4955 nxt_unit_read_buf_t *rbuf; 4956 nxt_unit_ctx_impl_t *ctx_impl; 4957 4958 rbuf = nxt_unit_read_buf_get(ctx); 4959 if (nxt_slow_path(rbuf == NULL)) { 4960 return NXT_UNIT_ERROR; 4961 } 4962 4963 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4964 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4965 4966 retry: 4967 4968 if (port == lib->shared_port) { 4969 rc = nxt_unit_shared_port_recv(ctx, port, rbuf); 4970 4971 } else { 4972 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf); 4973 } 4974 4975 if (rc != NXT_UNIT_OK) { 4976 nxt_unit_read_buf_release(ctx, rbuf); 4977 return rc; 4978 } 4979 4980 rc = nxt_unit_process_msg(ctx, rbuf); 4981 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4982 return NXT_UNIT_ERROR; 4983 } 4984 4985 rc = nxt_unit_process_pending_rbuf(ctx); 4986 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4987 return NXT_UNIT_ERROR; 4988 } 4989 4990 nxt_unit_process_ready_req(ctx); 4991 4992 rbuf = nxt_unit_read_buf_get(ctx); 4993 if (nxt_slow_path(rbuf == NULL)) { 4994 return NXT_UNIT_ERROR; 4995 } 4996 4997 if (ctx_impl->online) { 4998 goto retry; 4999 } 5000 5001 return rc; 5002 } 5003 5004 5005 void 5006 nxt_unit_done(nxt_unit_ctx_t *ctx) 5007 { 5008 nxt_unit_ctx_release(ctx); 5009 } 5010 5011 5012 nxt_unit_ctx_t * 5013 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) 5014 { 5015 int rc, queue_fd; 5016 void *mem; 5017 nxt_unit_impl_t *lib; 5018 nxt_unit_port_t *port; 5019 nxt_unit_ctx_impl_t *new_ctx; 5020 nxt_unit_port_impl_t *port_impl; 5021 5022 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5023 5024 new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t) 5025 + lib->request_data_size); 5026 if (nxt_slow_path(new_ctx == NULL)) { 5027 nxt_unit_alert(ctx, "failed to allocate context"); 5028 5029 return NULL; 5030 } 5031 5032 rc = nxt_unit_ctx_init(lib, new_ctx, data); 5033 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5034 nxt_unit_free(ctx, new_ctx); 5035 5036 return NULL; 5037 } 5038 5039 queue_fd = -1; 5040 5041 port = nxt_unit_create_port(&new_ctx->ctx); 5042 if (nxt_slow_path(port == NULL)) { 5043 goto fail; 5044 } 5045 5046 new_ctx->read_port = port; 5047 5048 queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t)); 5049 if (nxt_slow_path(queue_fd == -1)) { 5050 goto fail; 5051 } 5052 5053 mem = mmap(NULL, sizeof(nxt_port_queue_t), 5054 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 5055 if (nxt_slow_path(mem == MAP_FAILED)) { 5056 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 5057 strerror(errno), errno); 5058 5059 goto fail; 5060 } 5061 5062 nxt_port_queue_init(mem); 5063 5064 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5065 port_impl->queue = mem; 5066 5067 rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd); 5068 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5069 goto fail; 5070 } 5071 5072 nxt_unit_close(queue_fd); 5073 5074 return &new_ctx->ctx; 5075 5076 fail: 5077 5078 if (queue_fd != -1) { 5079 nxt_unit_close(queue_fd); 5080 } 5081 5082 nxt_unit_ctx_release(&new_ctx->ctx); 5083 5084 return NULL; 5085 } 5086 5087 5088 static void 5089 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) 5090 { 5091 nxt_unit_impl_t *lib; 5092 nxt_unit_mmap_buf_t *mmap_buf; 5093 nxt_unit_read_buf_t *rbuf; 5094 nxt_unit_request_info_impl_t *req_impl; 5095 nxt_unit_websocket_frame_impl_t *ws_impl; 5096 5097 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 5098 5099 nxt_queue_each(req_impl, &ctx_impl->active_req, 5100 nxt_unit_request_info_impl_t, link) 5101 { 5102 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 5103 5104 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 5105 5106 } nxt_queue_loop; 5107 5108 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]); 5109 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]); 5110 5111 while (ctx_impl->free_buf != NULL) { 5112 mmap_buf = ctx_impl->free_buf; 5113 nxt_unit_mmap_buf_unlink(mmap_buf); 5114 nxt_unit_free(&ctx_impl->ctx, mmap_buf); 5115 } 5116 5117 nxt_queue_each(req_impl, &ctx_impl->free_req, 5118 nxt_unit_request_info_impl_t, link) 5119 { 5120 nxt_unit_request_info_free(req_impl); 5121 5122 } nxt_queue_loop; 5123 5124 nxt_queue_each(ws_impl, &ctx_impl->free_ws, 5125 nxt_unit_websocket_frame_impl_t, link) 5126 { 5127 nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl); 5128 5129 } nxt_queue_loop; 5130 5131 nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link) 5132 { 5133 if (rbuf != &ctx_impl->ctx_read_buf) { 5134 nxt_unit_free(&ctx_impl->ctx, rbuf); 5135 } 5136 } nxt_queue_loop; 5137 5138 pthread_mutex_destroy(&ctx_impl->mutex); 5139 5140 pthread_mutex_lock(&lib->mutex); 5141 5142 nxt_queue_remove(&ctx_impl->link); 5143 5144 pthread_mutex_unlock(&lib->mutex); 5145 5146 if (nxt_fast_path(ctx_impl->read_port != NULL)) { 5147 nxt_unit_remove_port(lib, &ctx_impl->read_port->id); 5148 nxt_unit_port_release(ctx_impl->read_port); 5149 } 5150 5151 if (ctx_impl != &lib->main_ctx) { 5152 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl); 5153 } 5154 5155 nxt_unit_lib_release(lib); 5156 } 5157 5158 5159 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */ 5160 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET) 5161 #define NXT_UNIX_SOCKET SOCK_SEQPACKET 5162 #else 5163 #define NXT_UNIX_SOCKET SOCK_DGRAM 5164 #endif 5165 5166 5167 void 5168 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) 5169 { 5170 nxt_unit_port_hash_id_t port_hash_id; 5171 5172 port_hash_id.pid = pid; 5173 port_hash_id.id = id; 5174 5175 port_id->pid = pid; 5176 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id)); 5177 port_id->id = id; 5178 } 5179 5180 5181 static nxt_unit_port_t * 5182 nxt_unit_create_port(nxt_unit_ctx_t *ctx) 5183 { 5184 int rc, port_sockets[2]; 5185 nxt_unit_impl_t *lib; 5186 nxt_unit_port_t new_port, *port; 5187 nxt_unit_process_t *process; 5188 5189 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5190 5191 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets); 5192 if (nxt_slow_path(rc != 0)) { 5193 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", 5194 strerror(errno), errno); 5195 5196 return NULL; 5197 } 5198 5199 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", 5200 port_sockets[0], port_sockets[1]); 5201 5202 pthread_mutex_lock(&lib->mutex); 5203 5204 process = nxt_unit_process_get(ctx, lib->pid); 5205 if (nxt_slow_path(process == NULL)) { 5206 pthread_mutex_unlock(&lib->mutex); 5207 5208 nxt_unit_close(port_sockets[0]); 5209 nxt_unit_close(port_sockets[1]); 5210 5211 return NULL; 5212 } 5213 5214 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); 5215 5216 new_port.in_fd = port_sockets[0]; 5217 new_port.out_fd = port_sockets[1]; 5218 new_port.data = NULL; 5219 5220 pthread_mutex_unlock(&lib->mutex); 5221 5222 nxt_unit_process_release(process); 5223 5224 port = nxt_unit_add_port(ctx, &new_port, NULL); 5225 if (nxt_slow_path(port == NULL)) { 5226 nxt_unit_close(port_sockets[0]); 5227 nxt_unit_close(port_sockets[1]); 5228 } 5229 5230 return port; 5231 } 5232 5233 5234 static int 5235 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 5236 nxt_unit_port_t *port, int queue_fd) 5237 { 5238 ssize_t res; 5239 nxt_unit_impl_t *lib; 5240 int fds[2] = { port->out_fd, queue_fd }; 5241 5242 struct { 5243 nxt_port_msg_t msg; 5244 nxt_port_msg_new_port_t new_port; 5245 } m; 5246 5247 union { 5248 struct cmsghdr cm; 5249 char space[CMSG_SPACE(sizeof(int) * 2)]; 5250 } cmsg; 5251 5252 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5253 5254 m.msg.stream = 0; 5255 m.msg.pid = lib->pid; 5256 m.msg.reply_port = 0; 5257 m.msg.type = _NXT_PORT_MSG_NEW_PORT; 5258 m.msg.last = 0; 5259 m.msg.mmap = 0; 5260 m.msg.nf = 0; 5261 m.msg.mf = 0; 5262 m.msg.tracking = 0; 5263 5264 m.new_port.id = port->id.id; 5265 m.new_port.pid = port->id.pid; 5266 m.new_port.type = NXT_PROCESS_APP; 5267 m.new_port.max_size = 16 * 1024; 5268 m.new_port.max_share = 64 * 1024; 5269 5270 memset(&cmsg, 0, sizeof(cmsg)); 5271 5272 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); 5273 cmsg.cm.cmsg_level = SOL_SOCKET; 5274 cmsg.cm.cmsg_type = SCM_RIGHTS; 5275 5276 /* 5277 * memcpy() is used instead of simple 5278 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 5279 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 5280 * dereferencing type-punned pointer will break strict-aliasing rules 5281 * 5282 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 5283 * in the same simple assignment as in the code above. 5284 */ 5285 memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); 5286 5287 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); 5288 5289 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; 5290 } 5291 5292 5293 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port) 5294 { 5295 nxt_unit_port_impl_t *port_impl; 5296 5297 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5298 5299 nxt_atomic_fetch_add(&port_impl->use_count, 1); 5300 } 5301 5302 5303 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) 5304 { 5305 long c; 5306 nxt_unit_port_impl_t *port_impl; 5307 5308 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5309 5310 c = nxt_atomic_fetch_add(&port_impl->use_count, -1); 5311 5312 if (c == 1) { 5313 nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d", 5314 (int) port->id.pid, (int) port->id.id, 5315 port->in_fd, port->out_fd); 5316 5317 nxt_unit_process_release(port_impl->process); 5318 5319 if (port->in_fd != -1) { 5320 nxt_unit_close(port->in_fd); 5321 5322 port->in_fd = -1; 5323 } 5324 5325 if (port->out_fd != -1) { 5326 nxt_unit_close(port->out_fd); 5327 5328 port->out_fd = -1; 5329 } 5330 5331 if (port_impl->queue != NULL) { 5332 munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID) 5333 ? sizeof(nxt_app_queue_t) 5334 : sizeof(nxt_port_queue_t)); 5335 } 5336 5337 nxt_unit_free(NULL, port_impl); 5338 } 5339 } 5340 5341 5342 static nxt_unit_port_t * 5343 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) 5344 { 5345 int rc, ready; 5346 nxt_queue_t awaiting_req; 5347 nxt_unit_impl_t *lib; 5348 nxt_unit_port_t *old_port; 5349 nxt_unit_process_t *process; 5350 nxt_unit_port_impl_t *new_port, *old_port_impl; 5351 5352 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5353 5354 pthread_mutex_lock(&lib->mutex); 5355 5356 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); 5357 5358 if (nxt_slow_path(old_port != NULL)) { 5359 nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} " 5360 "in_fd %d out_fd %d queue %p", 5361 port->id.pid, port->id.id, 5362 port->in_fd, port->out_fd, queue); 5363 5364 if (old_port->data == NULL) { 5365 old_port->data = port->data; 5366 port->data = NULL; 5367 } 5368 5369 if (old_port->in_fd == -1) { 5370 old_port->in_fd = port->in_fd; 5371 port->in_fd = -1; 5372 } 5373 5374 if (port->in_fd != -1) { 5375 nxt_unit_close(port->in_fd); 5376 port->in_fd = -1; 5377 } 5378 5379 if (old_port->out_fd == -1) { 5380 old_port->out_fd = port->out_fd; 5381 port->out_fd = -1; 5382 } 5383 5384 if (port->out_fd != -1) { 5385 nxt_unit_close(port->out_fd); 5386 port->out_fd = -1; 5387 } 5388 5389 *port = *old_port; 5390 5391 nxt_queue_init(&awaiting_req); 5392 5393 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); 5394 5395 if (old_port_impl->queue == NULL) { 5396 old_port_impl->queue = queue; 5397 } 5398 5399 ready = (port->in_fd != -1 || port->out_fd != -1); 5400 5401 /* 5402 * Port can be market as 'ready' only after callbacks.add_port() call. 5403 * Otherwise, request may try to use the port before callback. 5404 */ 5405 if (lib->callbacks.add_port == NULL && ready) { 5406 old_port_impl->ready = ready; 5407 5408 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { 5409 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); 5410 nxt_queue_init(&old_port_impl->awaiting_req); 5411 } 5412 } 5413 5414 pthread_mutex_unlock(&lib->mutex); 5415 5416 if (lib->callbacks.add_port != NULL && ready) { 5417 lib->callbacks.add_port(ctx, old_port); 5418 5419 pthread_mutex_lock(&lib->mutex); 5420 5421 old_port_impl->ready = ready; 5422 5423 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { 5424 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); 5425 nxt_queue_init(&old_port_impl->awaiting_req); 5426 } 5427 5428 pthread_mutex_unlock(&lib->mutex); 5429 } 5430 5431 nxt_unit_process_awaiting_req(ctx, &awaiting_req); 5432 5433 return old_port; 5434 } 5435 5436 new_port = NULL; 5437 ready = 0; 5438 5439 nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", 5440 port->id.pid, port->id.id, 5441 port->in_fd, port->out_fd, queue); 5442 5443 process = nxt_unit_process_get(ctx, port->id.pid); 5444 if (nxt_slow_path(process == NULL)) { 5445 goto unlock; 5446 } 5447 5448 if (port->id.id != NXT_UNIT_SHARED_PORT_ID 5449 && port->id.id >= process->next_port_id) 5450 { 5451 process->next_port_id = port->id.id + 1; 5452 } 5453 5454 new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 5455 if (nxt_slow_path(new_port == NULL)) { 5456 nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", 5457 port->id.pid, port->id.id); 5458 5459 goto unlock; 5460 } 5461 5462 new_port->port = *port; 5463 5464 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); 5465 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5466 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", 5467 port->id.pid, port->id.id); 5468 5469 nxt_unit_free(ctx, new_port); 5470 5471 new_port = NULL; 5472 5473 goto unlock; 5474 } 5475 5476 nxt_queue_insert_tail(&process->ports, &new_port->link); 5477 5478 new_port->use_count = 2; 5479 new_port->process = process; 5480 new_port->queue = queue; 5481 new_port->from_socket = 0; 5482 new_port->socket_rbuf = NULL; 5483 5484 nxt_queue_init(&new_port->awaiting_req); 5485 5486 ready = (port->in_fd != -1 || port->out_fd != -1); 5487 5488 if (lib->callbacks.add_port == NULL) { 5489 new_port->ready = ready; 5490 5491 } else { 5492 new_port->ready = 0; 5493 } 5494 5495 process = NULL; 5496 5497 unlock: 5498 5499 pthread_mutex_unlock(&lib->mutex); 5500 5501 if (nxt_slow_path(process != NULL)) { 5502 nxt_unit_process_release(process); 5503 } 5504 5505 if (lib->callbacks.add_port != NULL && new_port != NULL && ready) { 5506 lib->callbacks.add_port(ctx, &new_port->port); 5507 5508 nxt_queue_init(&awaiting_req); 5509 5510 pthread_mutex_lock(&lib->mutex); 5511 5512 new_port->ready = 1; 5513 5514 if (!nxt_queue_is_empty(&new_port->awaiting_req)) { 5515 nxt_queue_add(&awaiting_req, &new_port->awaiting_req); 5516 nxt_queue_init(&new_port->awaiting_req); 5517 } 5518 5519 pthread_mutex_unlock(&lib->mutex); 5520 5521 nxt_unit_process_awaiting_req(ctx, &awaiting_req); 5522 } 5523 5524 return (new_port == NULL) ? NULL : &new_port->port; 5525 } 5526 5527 5528 static void 5529 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req) 5530 { 5531 nxt_unit_ctx_impl_t *ctx_impl; 5532 nxt_unit_request_info_impl_t *req_impl; 5533 5534 nxt_queue_each(req_impl, awaiting_req, 5535 nxt_unit_request_info_impl_t, port_wait_link) 5536 { 5537 nxt_queue_remove(&req_impl->port_wait_link); 5538 5539 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, 5540 ctx); 5541 5542 pthread_mutex_lock(&ctx_impl->mutex); 5543 5544 nxt_queue_insert_tail(&ctx_impl->ready_req, 5545 &req_impl->port_wait_link); 5546 5547 pthread_mutex_unlock(&ctx_impl->mutex); 5548 5549 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); 5550 5551 nxt_unit_awake_ctx(ctx, ctx_impl); 5552 5553 } nxt_queue_loop; 5554 } 5555 5556 5557 static void 5558 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 5559 { 5560 nxt_unit_port_t *port; 5561 nxt_unit_port_impl_t *port_impl; 5562 5563 pthread_mutex_lock(&lib->mutex); 5564 5565 port = nxt_unit_remove_port_unsafe(lib, port_id); 5566 5567 if (nxt_fast_path(port != NULL)) { 5568 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5569 5570 nxt_queue_remove(&port_impl->link); 5571 } 5572 5573 pthread_mutex_unlock(&lib->mutex); 5574 5575 if (lib->callbacks.remove_port != NULL && port != NULL) { 5576 lib->callbacks.remove_port(&lib->unit, port); 5577 } 5578 5579 if (nxt_fast_path(port != NULL)) { 5580 nxt_unit_port_release(port); 5581 } 5582 } 5583 5584 5585 static nxt_unit_port_t * 5586 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 5587 { 5588 nxt_unit_port_t *port; 5589 5590 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); 5591 if (nxt_slow_path(port == NULL)) { 5592 nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found", 5593 (int) port_id->pid, (int) port_id->id); 5594 5595 return NULL; 5596 } 5597 5598 nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p", 5599 (int) port_id->pid, (int) port_id->id, 5600 port->in_fd, port->out_fd, port->data); 5601 5602 return port; 5603 } 5604 5605 5606 static void 5607 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) 5608 { 5609 nxt_unit_process_t *process; 5610 5611 pthread_mutex_lock(&lib->mutex); 5612 5613 process = nxt_unit_process_find(lib, pid, 1); 5614 if (nxt_slow_path(process == NULL)) { 5615 nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid); 5616 5617 pthread_mutex_unlock(&lib->mutex); 5618 5619 return; 5620 } 5621 5622 nxt_unit_remove_process(lib, process); 5623 5624 if (lib->callbacks.remove_pid != NULL) { 5625 lib->callbacks.remove_pid(&lib->unit, pid); 5626 } 5627 } 5628 5629 5630 static void 5631 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) 5632 { 5633 nxt_queue_t ports; 5634 nxt_unit_port_impl_t *port; 5635 5636 nxt_queue_init(&ports); 5637 5638 nxt_queue_add(&ports, &process->ports); 5639 5640 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 5641 5642 nxt_unit_remove_port_unsafe(lib, &port->port.id); 5643 5644 } nxt_queue_loop; 5645 5646 pthread_mutex_unlock(&lib->mutex); 5647 5648 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 5649 5650 nxt_queue_remove(&port->link); 5651 5652 if (lib->callbacks.remove_port != NULL) { 5653 lib->callbacks.remove_port(&lib->unit, &port->port); 5654 } 5655 5656 nxt_unit_port_release(&port->port); 5657 5658 } nxt_queue_loop; 5659 5660 nxt_unit_process_release(process); 5661 } 5662 5663 5664 static void 5665 nxt_unit_quit(nxt_unit_ctx_t *ctx) 5666 { 5667 nxt_port_msg_t msg; 5668 nxt_unit_impl_t *lib; 5669 nxt_unit_ctx_impl_t *ctx_impl; 5670 5671 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5672 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5673 5674 if (!ctx_impl->online) { 5675 return; 5676 } 5677 5678 ctx_impl->online = 0; 5679 5680 if (lib->callbacks.quit != NULL) { 5681 lib->callbacks.quit(ctx); 5682 } 5683 5684 if (ctx != &lib->main_ctx.ctx) { 5685 return; 5686 } 5687 5688 memset(&msg, 0, sizeof(nxt_port_msg_t)); 5689 5690 msg.pid = lib->pid; 5691 msg.type = _NXT_PORT_MSG_QUIT; 5692 5693 pthread_mutex_lock(&lib->mutex); 5694 5695 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { 5696 5697 if (ctx == &ctx_impl->ctx 5698 || ctx_impl->read_port == NULL 5699 || ctx_impl->read_port->out_fd == -1) 5700 { 5701 continue; 5702 } 5703 5704 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, 5705 &msg, sizeof(msg), NULL, 0); 5706 5707 } nxt_queue_loop; 5708 5709 pthread_mutex_unlock(&lib->mutex); 5710 } 5711 5712 5713 static int 5714 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 5715 { 5716 ssize_t res; 5717 nxt_unit_impl_t *lib; 5718 nxt_unit_ctx_impl_t *ctx_impl; 5719 5720 struct { 5721 nxt_port_msg_t msg; 5722 nxt_port_msg_get_port_t get_port; 5723 } m; 5724 5725 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5726 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5727 5728 memset(&m.msg, 0, sizeof(nxt_port_msg_t)); 5729 5730 m.msg.pid = lib->pid; 5731 m.msg.reply_port = ctx_impl->read_port->id.id; 5732 m.msg.type = _NXT_PORT_MSG_GET_PORT; 5733 5734 m.get_port.id = port_id->id; 5735 m.get_port.pid = port_id->pid; 5736 5737 nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, 5738 (int) port_id->id); 5739 5740 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); 5741 if (nxt_slow_path(res != sizeof(m))) { 5742 return NXT_UNIT_ERROR; 5743 } 5744 5745 return NXT_UNIT_OK; 5746 } 5747 5748 5749 static ssize_t 5750 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5751 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 5752 { 5753 int notify; 5754 ssize_t ret; 5755 nxt_int_t rc; 5756 nxt_port_msg_t msg; 5757 nxt_unit_impl_t *lib; 5758 nxt_unit_port_impl_t *port_impl; 5759 5760 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5761 5762 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5763 if (port_impl->queue != NULL && oob_size == 0 5764 && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) 5765 { 5766 rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); 5767 if (nxt_slow_path(rc != NXT_OK)) { 5768 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", 5769 (int) port->id.pid, (int) port->id.id); 5770 5771 return -1; 5772 } 5773 5774 nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d", 5775 (int) port->id.pid, (int) port->id.id, 5776 (int) buf_size, notify); 5777 5778 if (notify) { 5779 memcpy(&msg, buf, sizeof(nxt_port_msg_t)); 5780 5781 msg.type = _NXT_PORT_MSG_READ_QUEUE; 5782 5783 if (lib->callbacks.port_send == NULL) { 5784 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, 5785 sizeof(nxt_port_msg_t), NULL, 0); 5786 5787 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", 5788 (int) port->id.pid, (int) port->id.id, 5789 (int) ret); 5790 5791 } else { 5792 ret = lib->callbacks.port_send(ctx, port, &msg, 5793 sizeof(nxt_port_msg_t), NULL, 0); 5794 5795 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue", 5796 (int) port->id.pid, (int) port->id.id, 5797 (int) ret); 5798 } 5799 5800 } 5801 5802 return buf_size; 5803 } 5804 5805 if (port_impl->queue != NULL) { 5806 msg.type = _NXT_PORT_MSG_READ_SOCKET; 5807 5808 rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify); 5809 if (nxt_slow_path(rc != NXT_OK)) { 5810 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", 5811 (int) port->id.pid, (int) port->id.id); 5812 5813 return -1; 5814 } 5815 5816 nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d", 5817 (int) port->id.pid, (int) port->id.id, notify); 5818 } 5819 5820 if (lib->callbacks.port_send != NULL) { 5821 ret = lib->callbacks.port_send(ctx, port, buf, buf_size, 5822 oob, oob_size); 5823 5824 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", 5825 (int) port->id.pid, (int) port->id.id, 5826 (int) ret); 5827 5828 } else { 5829 ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, 5830 oob, oob_size); 5831 5832 nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", 5833 (int) port->id.pid, (int) port->id.id, 5834 (int) ret); 5835 } 5836 5837 return ret; 5838 } 5839 5840 5841 static ssize_t 5842 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 5843 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 5844 { 5845 int err; 5846 ssize_t res; 5847 struct iovec iov[1]; 5848 struct msghdr msg; 5849 5850 iov[0].iov_base = (void *) buf; 5851 iov[0].iov_len = buf_size; 5852 5853 msg.msg_name = NULL; 5854 msg.msg_namelen = 0; 5855 msg.msg_iov = iov; 5856 msg.msg_iovlen = 1; 5857 msg.msg_flags = 0; 5858 msg.msg_control = (void *) oob; 5859 msg.msg_controllen = oob_size; 5860 5861 retry: 5862 5863 res = sendmsg(fd, &msg, 0); 5864 5865 if (nxt_slow_path(res == -1)) { 5866 err = errno; 5867 5868 if (err == EINTR) { 5869 goto retry; 5870 } 5871 5872 /* 5873 * FIXME: This should be "alert" after router graceful shutdown 5874 * implementation. 5875 */ 5876 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", 5877 fd, (int) buf_size, strerror(err), err); 5878 5879 } else { 5880 nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, 5881 (int) res); 5882 } 5883 5884 return res; 5885 } 5886 5887 5888 static int 5889 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5890 nxt_unit_read_buf_t *rbuf) 5891 { 5892 int res, read; 5893 nxt_unit_port_impl_t *port_impl; 5894 5895 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5896 5897 read = 0; 5898 5899 retry: 5900 5901 if (port_impl->from_socket > 0) { 5902 if (port_impl->socket_rbuf != NULL 5903 && port_impl->socket_rbuf->size > 0) 5904 { 5905 port_impl->from_socket--; 5906 5907 nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf); 5908 port_impl->socket_rbuf->size = 0; 5909 5910 nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d", 5911 (int) port->id.pid, (int) port->id.id, 5912 (int) rbuf->size); 5913 5914 return NXT_UNIT_OK; 5915 } 5916 5917 } else { 5918 res = nxt_unit_port_queue_recv(port, rbuf); 5919 5920 if (res == NXT_UNIT_OK) { 5921 if (nxt_unit_is_read_socket(rbuf)) { 5922 port_impl->from_socket++; 5923 5924 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", 5925 (int) port->id.pid, (int) port->id.id, 5926 port_impl->from_socket); 5927 5928 goto retry; 5929 } 5930 5931 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", 5932 (int) port->id.pid, (int) port->id.id, 5933 (int) rbuf->size); 5934 5935 return NXT_UNIT_OK; 5936 } 5937 } 5938 5939 if (read) { 5940 return NXT_UNIT_AGAIN; 5941 } 5942 5943 res = nxt_unit_port_recv(ctx, port, rbuf); 5944 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 5945 return NXT_UNIT_ERROR; 5946 } 5947 5948 read = 1; 5949 5950 if (nxt_unit_is_read_queue(rbuf)) { 5951 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", 5952 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 5953 5954 goto retry; 5955 } 5956 5957 nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d", 5958 (int) port->id.pid, (int) port->id.id, 5959 (int) rbuf->size); 5960 5961 if (res == NXT_UNIT_AGAIN) { 5962 return NXT_UNIT_AGAIN; 5963 } 5964 5965 if (port_impl->from_socket > 0) { 5966 port_impl->from_socket--; 5967 5968 return NXT_UNIT_OK; 5969 } 5970 5971 nxt_unit_debug(ctx, "port{%d,%d} suspend message %d", 5972 (int) port->id.pid, (int) port->id.id, 5973 (int) rbuf->size); 5974 5975 if (port_impl->socket_rbuf == NULL) { 5976 port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx); 5977 5978 if (nxt_slow_path(port_impl->socket_rbuf == NULL)) { 5979 return NXT_UNIT_ERROR; 5980 } 5981 5982 port_impl->socket_rbuf->size = 0; 5983 } 5984 5985 if (port_impl->socket_rbuf->size > 0) { 5986 nxt_unit_alert(ctx, "too many port socket messages"); 5987 5988 return NXT_UNIT_ERROR; 5989 } 5990 5991 nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); 5992 5993 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 5994 5995 goto retry; 5996 } 5997 5998 5999 nxt_inline void 6000 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) 6001 { 6002 memcpy(dst->buf, src->buf, src->size); 6003 dst->size = src->size; 6004 memcpy(dst->oob, src->oob, sizeof(src->oob)); 6005 } 6006 6007 6008 static int 6009 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 6010 nxt_unit_read_buf_t *rbuf) 6011 { 6012 int res; 6013 6014 retry: 6015 6016 res = nxt_unit_app_queue_recv(port, rbuf); 6017 6018 if (res == NXT_UNIT_AGAIN) { 6019 res = nxt_unit_port_recv(ctx, port, rbuf); 6020 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 6021 return NXT_UNIT_ERROR; 6022 } 6023 6024 if (nxt_unit_is_read_queue(rbuf)) { 6025 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", 6026 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 6027 6028 goto retry; 6029 } 6030 } 6031 6032 return res; 6033 } 6034 6035 6036 static int 6037 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 6038 nxt_unit_read_buf_t *rbuf) 6039 { 6040 int fd, err; 6041 struct iovec iov[1]; 6042 struct msghdr msg; 6043 nxt_unit_impl_t *lib; 6044 6045 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 6046 6047 if (lib->callbacks.port_recv != NULL) { 6048 rbuf->size = lib->callbacks.port_recv(ctx, port, 6049 rbuf->buf, sizeof(rbuf->buf), 6050 rbuf->oob, sizeof(rbuf->oob)); 6051 6052 nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", 6053 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 6054 6055 if (nxt_slow_path(rbuf->size < 0)) { 6056 return NXT_UNIT_ERROR; 6057 } 6058 6059 return NXT_UNIT_OK; 6060 } 6061 6062 iov[0].iov_base = rbuf->buf; 6063 iov[0].iov_len = sizeof(rbuf->buf); 6064 6065 msg.msg_name = NULL; 6066 msg.msg_namelen = 0; 6067 msg.msg_iov = iov; 6068 msg.msg_iovlen = 1; 6069 msg.msg_flags = 0; 6070 msg.msg_control = rbuf->oob; 6071 msg.msg_controllen = sizeof(rbuf->oob); 6072 6073 fd = port->in_fd; 6074 6075 retry: 6076 6077 rbuf->size = recvmsg(fd, &msg, 0); 6078 6079 if (nxt_slow_path(rbuf->size == -1)) { 6080 err = errno; 6081 6082 if (err == EINTR) { 6083 goto retry; 6084 } 6085 6086 if (err == EAGAIN) { 6087 nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", 6088 fd, strerror(err), err); 6089 6090 return NXT_UNIT_AGAIN; 6091 } 6092 6093 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", 6094 fd, strerror(err), err); 6095 6096 return NXT_UNIT_ERROR; 6097 } 6098 6099 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size); 6100 6101 return NXT_UNIT_OK; 6102 } 6103 6104 6105 static int 6106 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) 6107 { 6108 nxt_unit_port_impl_t *port_impl; 6109 6110 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 6111 6112 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf); 6113 6114 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; 6115 } 6116 6117 6118 static int 6119 nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) 6120 { 6121 uint32_t cookie; 6122 nxt_port_msg_t *port_msg; 6123 nxt_app_queue_t *queue; 6124 nxt_unit_port_impl_t *port_impl; 6125 6126 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 6127 queue = port_impl->queue; 6128 6129 retry: 6130 6131 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie); 6132 6133 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size); 6134 6135 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) { 6136 port_msg = (nxt_port_msg_t *) rbuf->buf; 6137 6138 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { 6139 return NXT_UNIT_OK; 6140 } 6141 6142 nxt_unit_debug(NULL, "app_queue_recv: message cancelled"); 6143 6144 goto retry; 6145 } 6146 6147 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; 6148 } 6149 6150 6151 nxt_inline int 6152 nxt_unit_close(int fd) 6153 { 6154 int res; 6155 6156 res = close(fd); 6157 6158 if (nxt_slow_path(res == -1)) { 6159 nxt_unit_alert(NULL, "close(%d) failed: %s (%d)", 6160 fd, strerror(errno), errno); 6161 6162 } else { 6163 nxt_unit_debug(NULL, "close(%d): %d", fd, res); 6164 } 6165 6166 return res; 6167 } 6168 6169 6170 static int 6171 nxt_unit_fd_blocking(int fd) 6172 { 6173 int nb; 6174 6175 nb = 0; 6176 6177 if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) { 6178 nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 6179 fd, strerror(errno), errno); 6180 6181 return NXT_UNIT_ERROR; 6182 } 6183 6184 return NXT_UNIT_OK; 6185 } 6186 6187 6188 static nxt_int_t 6189 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 6190 { 6191 nxt_unit_port_t *port; 6192 nxt_unit_port_hash_id_t *port_id; 6193 6194 port = data; 6195 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start; 6196 6197 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t) 6198 && port_id->pid == port->id.pid 6199 && port_id->id == port->id.id) 6200 { 6201 return NXT_OK; 6202 } 6203 6204 return NXT_DECLINED; 6205 } 6206 6207 6208 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { 6209 NXT_LVLHSH_DEFAULT, 6210 nxt_unit_port_hash_test, 6211 nxt_unit_lvlhsh_alloc, 6212 nxt_unit_lvlhsh_free, 6213 }; 6214 6215 6216 static inline void 6217 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq, 6218 nxt_unit_port_hash_id_t *port_hash_id, 6219 nxt_unit_port_id_t *port_id) 6220 { 6221 port_hash_id->pid = port_id->pid; 6222 port_hash_id->id = port_id->id; 6223 6224 if (nxt_fast_path(port_id->hash != 0)) { 6225 lhq->key_hash = port_id->hash; 6226 6227 } else { 6228 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id)); 6229 6230 port_id->hash = lhq->key_hash; 6231 6232 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X", 6233 (int) port_id->pid, (int) port_id->id, 6234 (int) port_id->hash); 6235 } 6236 6237 lhq->key.length = sizeof(nxt_unit_port_hash_id_t); 6238 lhq->key.start = (u_char *) port_hash_id; 6239 lhq->proto = &lvlhsh_ports_proto; 6240 lhq->pool = NULL; 6241 } 6242 6243 6244 static int 6245 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) 6246 { 6247 nxt_int_t res; 6248 nxt_lvlhsh_query_t lhq; 6249 nxt_unit_port_hash_id_t port_hash_id; 6250 6251 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id); 6252 lhq.replace = 0; 6253 lhq.value = port; 6254 6255 res = nxt_lvlhsh_insert(port_hash, &lhq); 6256 6257 switch (res) { 6258 6259 case NXT_OK: 6260 return NXT_UNIT_OK; 6261 6262 default: 6263 return NXT_UNIT_ERROR; 6264 } 6265 } 6266 6267 6268 static nxt_unit_port_t * 6269 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, 6270 int remove) 6271 { 6272 nxt_int_t res; 6273 nxt_lvlhsh_query_t lhq; 6274 nxt_unit_port_hash_id_t port_hash_id; 6275 6276 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id); 6277 6278 if (remove) { 6279 res = nxt_lvlhsh_delete(port_hash, &lhq); 6280 6281 } else { 6282 res = nxt_lvlhsh_find(port_hash, &lhq); 6283 } 6284 6285 switch (res) { 6286 6287 case NXT_OK: 6288 if (!remove) { 6289 nxt_unit_port_use(lhq.value); 6290 } 6291 6292 return lhq.value; 6293 6294 default: 6295 return NULL; 6296 } 6297 } 6298 6299 6300 static nxt_int_t 6301 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 6302 { 6303 return NXT_OK; 6304 } 6305 6306 6307 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { 6308 NXT_LVLHSH_DEFAULT, 6309 nxt_unit_request_hash_test, 6310 nxt_unit_lvlhsh_alloc, 6311 nxt_unit_lvlhsh_free, 6312 }; 6313 6314 6315 static int 6316 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 6317 nxt_unit_request_info_t *req) 6318 { 6319 uint32_t *stream; 6320 nxt_int_t res; 6321 nxt_lvlhsh_query_t lhq; 6322 nxt_unit_ctx_impl_t *ctx_impl; 6323 nxt_unit_request_info_impl_t *req_impl; 6324 6325 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 6326 if (req_impl->in_hash) { 6327 return NXT_UNIT_OK; 6328 } 6329 6330 stream = &req_impl->stream; 6331 6332 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 6333 lhq.key.length = sizeof(*stream); 6334 lhq.key.start = (u_char *) stream; 6335 lhq.proto = &lvlhsh_requests_proto; 6336 lhq.pool = NULL; 6337 lhq.replace = 0; 6338 lhq.value = req_impl; 6339 6340 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 6341 6342 pthread_mutex_lock(&ctx_impl->mutex); 6343 6344 res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq); 6345 6346 pthread_mutex_unlock(&ctx_impl->mutex); 6347 6348 switch (res) { 6349 6350 case NXT_OK: 6351 req_impl->in_hash = 1; 6352 return NXT_UNIT_OK; 6353 6354 default: 6355 return NXT_UNIT_ERROR; 6356 } 6357 } 6358 6359 6360 static nxt_unit_request_info_t * 6361 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) 6362 { 6363 nxt_int_t res; 6364 nxt_lvlhsh_query_t lhq; 6365 nxt_unit_ctx_impl_t *ctx_impl; 6366 nxt_unit_request_info_impl_t *req_impl; 6367 6368 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); 6369 lhq.key.length = sizeof(stream); 6370 lhq.key.start = (u_char *) &stream; 6371 lhq.proto = &lvlhsh_requests_proto; 6372 lhq.pool = NULL; 6373 6374 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 6375 6376 pthread_mutex_lock(&ctx_impl->mutex); 6377 6378 if (remove) { 6379 res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq); 6380 6381 } else { 6382 res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq); 6383 } 6384 6385 pthread_mutex_unlock(&ctx_impl->mutex); 6386 6387 switch (res) { 6388 6389 case NXT_OK: 6390 req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t, 6391 req); 6392 if (remove) { 6393 req_impl->in_hash = 0; 6394 } 6395 6396 return lhq.value; 6397 6398 default: 6399 return NULL; 6400 } 6401 } 6402 6403 6404 void 6405 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 6406 { 6407 int log_fd, n; 6408 char msg[NXT_MAX_ERROR_STR], *p, *end; 6409 pid_t pid; 6410 va_list ap; 6411 nxt_unit_impl_t *lib; 6412 6413 if (nxt_fast_path(ctx != NULL)) { 6414 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 6415 6416 pid = lib->pid; 6417 log_fd = lib->log_fd; 6418 6419 } else { 6420 pid = getpid(); 6421 log_fd = STDERR_FILENO; 6422 } 6423 6424 p = msg; 6425 end = p + sizeof(msg) - 1; 6426 6427 p = nxt_unit_snprint_prefix(p, end, pid, level); 6428 6429 va_start(ap, fmt); 6430 p += vsnprintf(p, end - p, fmt, ap); 6431 va_end(ap); 6432 6433 if (nxt_slow_path(p > end)) { 6434 memcpy(end - 5, "[...]", 5); 6435 p = end; 6436 } 6437 6438 *p++ = '\n'; 6439 6440 n = write(log_fd, msg, p - msg); 6441 if (nxt_slow_path(n < 0)) { 6442 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 6443 } 6444 } 6445 6446 6447 void 6448 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) 6449 { 6450 int log_fd, n; 6451 char msg[NXT_MAX_ERROR_STR], *p, *end; 6452 pid_t pid; 6453 va_list ap; 6454 nxt_unit_impl_t *lib; 6455 nxt_unit_request_info_impl_t *req_impl; 6456 6457 if (nxt_fast_path(req != NULL)) { 6458 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 6459 6460 pid = lib->pid; 6461 log_fd = lib->log_fd; 6462 6463 } else { 6464 pid = getpid(); 6465 log_fd = STDERR_FILENO; 6466 } 6467 6468 p = msg; 6469 end = p + sizeof(msg) - 1; 6470 6471 p = nxt_unit_snprint_prefix(p, end, pid, level); 6472 6473 if (nxt_fast_path(req != NULL)) { 6474 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 6475 6476 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); 6477 } 6478 6479 va_start(ap, fmt); 6480 p += vsnprintf(p, end - p, fmt, ap); 6481 va_end(ap); 6482 6483 if (nxt_slow_path(p > end)) { 6484 memcpy(end - 5, "[...]", 5); 6485 p = end; 6486 } 6487 6488 *p++ = '\n'; 6489 6490 n = write(log_fd, msg, p - msg); 6491 if (nxt_slow_path(n < 0)) { 6492 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 6493 } 6494 } 6495 6496 6497 static const char * nxt_unit_log_levels[] = { 6498 "alert", 6499 "error", 6500 "warn", 6501 "notice", 6502 "info", 6503 "debug", 6504 }; 6505 6506 6507 static char * 6508 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) 6509 { 6510 struct tm tm; 6511 struct timespec ts; 6512 6513 (void) clock_gettime(CLOCK_REALTIME, &ts); 6514 6515 #if (NXT_HAVE_LOCALTIME_R) 6516 (void) localtime_r(&ts.tv_sec, &tm); 6517 #else 6518 tm = *localtime(&ts.tv_sec); 6519 #endif 6520 6521 #if (NXT_DEBUG) 6522 p += snprintf(p, end - p, 6523 "%4d/%02d/%02d %02d:%02d:%02d.%03d ", 6524 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 6525 tm.tm_hour, tm.tm_min, tm.tm_sec, 6526 (int) ts.tv_nsec / 1000000); 6527 #else 6528 p += snprintf(p, end - p, 6529 "%4d/%02d/%02d %02d:%02d:%02d ", 6530 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 6531 tm.tm_hour, tm.tm_min, tm.tm_sec); 6532 #endif 6533 6534 p += snprintf(p, end - p, 6535 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], 6536 (int) pid, 6537 (uint64_t) (uintptr_t) nxt_thread_get_tid()); 6538 6539 return p; 6540 } 6541 6542 6543 static void * 6544 nxt_unit_lvlhsh_alloc(void *data, size_t size) 6545 { 6546 int err; 6547 void *p; 6548 6549 err = posix_memalign(&p, size, size); 6550 6551 if (nxt_fast_path(err == 0)) { 6552 nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p", 6553 (int) size, (int) size, p); 6554 return p; 6555 } 6556 6557 nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)", 6558 (int) size, (int) size, strerror(err), err); 6559 return NULL; 6560 } 6561 6562 6563 static void 6564 nxt_unit_lvlhsh_free(void *data, void *p) 6565 { 6566 nxt_unit_free(NULL, p); 6567 } 6568 6569 6570 void * 6571 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size) 6572 { 6573 void *p; 6574 6575 p = malloc(size); 6576 6577 if (nxt_fast_path(p != NULL)) { 6578 #if (NXT_DEBUG_ALLOC) 6579 nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p); 6580 #endif 6581 6582 } else { 6583 nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)", 6584 (int) size, strerror(errno), errno); 6585 } 6586 6587 return p; 6588 } 6589 6590 6591 void 6592 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p) 6593 { 6594 #if (NXT_DEBUG_ALLOC) 6595 nxt_unit_debug(ctx, "free(%p)", p); 6596 #endif 6597 6598 free(p); 6599 } 6600 6601 6602 static int 6603 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length) 6604 { 6605 u_char c1, c2; 6606 nxt_int_t n; 6607 const u_char *s1, *s2; 6608 6609 s1 = p1; 6610 s2 = p2; 6611 6612 while (length-- != 0) { 6613 c1 = *s1++; 6614 c2 = *s2++; 6615 6616 c1 = nxt_lowcase(c1); 6617 c2 = nxt_lowcase(c2); 6618 6619 n = c1 - c2; 6620 6621 if (n != 0) { 6622 return n; 6623 } 6624 } 6625 6626 return 0; 6627 } 6628