1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <stdlib.h> 7 8 #include "nxt_main.h" 9 #include "nxt_port_memory_int.h" 10 #include "nxt_port_queue.h" 11 #include "nxt_app_queue.h" 12 13 #include "nxt_unit.h" 14 #include "nxt_unit_request.h" 15 #include "nxt_unit_response.h" 16 #include "nxt_unit_websocket.h" 17 18 #include "nxt_websocket.h" 19 20 #if (NXT_HAVE_MEMFD_CREATE) 21 #include <linux/memfd.h> 22 #endif 23 24 #define NXT_UNIT_MAX_PLAIN_SIZE 1024 25 #define NXT_UNIT_LOCAL_BUF_SIZE \ 26 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) 27 28 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 29 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 30 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 31 typedef struct nxt_unit_process_s nxt_unit_process_t; 32 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 33 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 34 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; 35 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 36 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 37 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 38 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 39 40 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 41 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 42 nxt_unit_ctx_impl_t *ctx_impl, void *data); 43 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx); 44 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx); 45 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); 46 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); 47 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 48 nxt_unit_mmap_buf_t *mmap_buf); 49 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 50 nxt_unit_mmap_buf_t *mmap_buf); 51 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 52 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 53 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, 54 int *log_fd, uint32_t *stream, uint32_t *shm_limit); 55 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, 56 int queue_fd); 57 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); 58 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 59 nxt_unit_recv_msg_t *recv_msg); 60 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx); 61 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 62 nxt_unit_recv_msg_t *recv_msg); 63 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, 64 nxt_unit_recv_msg_t *recv_msg); 65 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 66 nxt_unit_port_id_t *port_id); 67 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); 68 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 69 nxt_unit_recv_msg_t *recv_msg); 70 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); 71 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 72 nxt_unit_ctx_t *ctx); 73 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 74 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 75 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 76 nxt_unit_ctx_t *ctx); 77 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 78 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 79 nxt_unit_websocket_frame_impl_t *ws); 80 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 81 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 82 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 83 nxt_unit_mmap_buf_t *mmap_buf, int last); 84 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 85 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); 86 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); 87 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( 88 nxt_unit_ctx_impl_t *ctx_impl); 89 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 90 nxt_unit_read_buf_t *rbuf); 91 static nxt_unit_mmap_buf_t *nxt_unit_request_preread( 92 nxt_unit_request_info_t *req, size_t size); 93 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 94 size_t size); 95 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 96 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n); 97 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 98 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); 99 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 100 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 101 nxt_unit_port_t *port, int n); 102 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); 103 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 104 int fd); 105 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 106 nxt_unit_port_t *port, uint32_t size, 107 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 108 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 109 110 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, 111 nxt_unit_ctx_impl_t *ctx_impl); 112 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 113 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); 114 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); 115 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 116 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, 117 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, 118 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); 119 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 120 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); 121 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); 122 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 123 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 124 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 125 126 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid); 127 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, 128 pid_t pid, int remove); 129 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 130 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); 131 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); 132 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); 133 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); 134 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); 135 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); 136 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); 137 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); 138 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, 139 nxt_unit_port_t *port); 140 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); 141 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); 142 143 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 144 nxt_unit_port_t *port, int queue_fd); 145 146 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); 147 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); 148 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, 149 nxt_unit_port_t *port, void *queue); 150 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, 151 nxt_unit_port_id_t *port_id); 152 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, 153 nxt_unit_port_id_t *port_id); 154 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); 155 static void nxt_unit_remove_process(nxt_unit_impl_t *lib, 156 nxt_unit_process_t *process); 157 static void nxt_unit_quit(nxt_unit_ctx_t *ctx); 158 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); 159 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, 160 nxt_unit_port_t *port, const void *buf, size_t buf_size, 161 const void *oob, size_t oob_size); 162 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 163 const void *buf, size_t buf_size, const void *oob, size_t oob_size); 164 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 165 nxt_unit_read_buf_t *rbuf); 166 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, 167 nxt_unit_read_buf_t *src); 168 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 169 nxt_unit_read_buf_t *rbuf); 170 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 171 nxt_unit_read_buf_t *rbuf); 172 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, 173 nxt_unit_read_buf_t *rbuf); 174 static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, 175 nxt_unit_read_buf_t *rbuf); 176 nxt_inline int nxt_unit_close(int fd); 177 static int nxt_unit_fd_blocking(int fd); 178 179 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 180 nxt_unit_port_t *port); 181 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 182 nxt_unit_port_id_t *port_id, int remove); 183 184 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 185 nxt_unit_request_info_t *req); 186 static nxt_unit_request_info_t *nxt_unit_request_hash_find( 187 nxt_unit_ctx_t *ctx, uint32_t stream, int remove); 188 189 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 190 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size); 191 static void nxt_unit_lvlhsh_free(void *data, void *p); 192 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length); 193 194 195 struct nxt_unit_mmap_buf_s { 196 nxt_unit_buf_t buf; 197 198 nxt_unit_mmap_buf_t *next; 199 nxt_unit_mmap_buf_t **prev; 200 201 nxt_port_mmap_header_t *hdr; 202 nxt_unit_request_info_t *req; 203 nxt_unit_ctx_impl_t *ctx_impl; 204 char *free_ptr; 205 char *plain_ptr; 206 }; 207 208 209 struct nxt_unit_recv_msg_s { 210 uint32_t stream; 211 nxt_pid_t pid; 212 nxt_port_id_t reply_port; 213 214 uint8_t last; /* 1 bit */ 215 uint8_t mmap; /* 1 bit */ 216 217 void *start; 218 uint32_t size; 219 220 int fd[2]; 221 222 nxt_unit_mmap_buf_t *incoming_buf; 223 }; 224 225 226 typedef enum { 227 NXT_UNIT_RS_START = 0, 228 NXT_UNIT_RS_RESPONSE_INIT, 229 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 230 NXT_UNIT_RS_RESPONSE_SENT, 231 NXT_UNIT_RS_RELEASED, 232 } nxt_unit_req_state_t; 233 234 235 struct nxt_unit_request_info_impl_s { 236 nxt_unit_request_info_t req; 237 238 uint32_t stream; 239 240 nxt_unit_mmap_buf_t *outgoing_buf; 241 nxt_unit_mmap_buf_t *incoming_buf; 242 243 nxt_unit_req_state_t state; 244 uint8_t websocket; 245 uint8_t in_hash; 246 247 /* for nxt_unit_ctx_impl_t.free_req or active_req */ 248 nxt_queue_link_t link; 249 /* for nxt_unit_port_impl_t.awaiting_req */ 250 nxt_queue_link_t port_wait_link; 251 252 char extra_data[]; 253 }; 254 255 256 struct nxt_unit_websocket_frame_impl_s { 257 nxt_unit_websocket_frame_t ws; 258 259 nxt_unit_mmap_buf_t *buf; 260 261 nxt_queue_link_t link; 262 263 nxt_unit_ctx_impl_t *ctx_impl; 264 }; 265 266 267 struct nxt_unit_read_buf_s { 268 nxt_queue_link_t link; 269 nxt_unit_ctx_impl_t *ctx_impl; 270 ssize_t size; 271 char buf[16384]; 272 char oob[256]; 273 }; 274 275 276 struct nxt_unit_ctx_impl_s { 277 nxt_unit_ctx_t ctx; 278 279 nxt_atomic_t use_count; 280 nxt_atomic_t wait_items; 281 282 pthread_mutex_t mutex; 283 284 nxt_unit_port_t *read_port; 285 286 nxt_queue_link_t link; 287 288 nxt_unit_mmap_buf_t *free_buf; 289 290 /* of nxt_unit_request_info_impl_t */ 291 nxt_queue_t free_req; 292 293 /* of nxt_unit_websocket_frame_impl_t */ 294 nxt_queue_t free_ws; 295 296 /* of nxt_unit_request_info_impl_t */ 297 nxt_queue_t active_req; 298 299 /* of nxt_unit_request_info_impl_t */ 300 nxt_lvlhsh_t requests; 301 302 /* of nxt_unit_request_info_impl_t */ 303 nxt_queue_t ready_req; 304 305 /* of nxt_unit_read_buf_t */ 306 nxt_queue_t pending_rbuf; 307 308 /* of nxt_unit_read_buf_t */ 309 nxt_queue_t free_rbuf; 310 311 int online; 312 int ready; 313 314 nxt_unit_mmap_buf_t ctx_buf[2]; 315 nxt_unit_read_buf_t ctx_read_buf; 316 317 nxt_unit_request_info_impl_t req; 318 }; 319 320 321 struct nxt_unit_mmap_s { 322 nxt_port_mmap_header_t *hdr; 323 pthread_t src_thread; 324 325 /* of nxt_unit_read_buf_t */ 326 nxt_queue_t awaiting_rbuf; 327 }; 328 329 330 struct nxt_unit_mmaps_s { 331 pthread_mutex_t mutex; 332 uint32_t size; 333 uint32_t cap; 334 nxt_atomic_t allocated_chunks; 335 nxt_unit_mmap_t *elts; 336 }; 337 338 339 struct nxt_unit_impl_s { 340 nxt_unit_t unit; 341 nxt_unit_callbacks_t callbacks; 342 343 nxt_atomic_t use_count; 344 345 uint32_t request_data_size; 346 uint32_t shm_mmap_limit; 347 348 pthread_mutex_t mutex; 349 350 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 351 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 352 353 nxt_unit_port_t *router_port; 354 nxt_unit_port_t *shared_port; 355 356 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 357 358 nxt_unit_mmaps_t incoming; 359 nxt_unit_mmaps_t outgoing; 360 361 pid_t pid; 362 int log_fd; 363 364 nxt_unit_ctx_impl_t main_ctx; 365 }; 366 367 368 struct nxt_unit_port_impl_s { 369 nxt_unit_port_t port; 370 371 nxt_atomic_t use_count; 372 373 /* for nxt_unit_process_t.ports */ 374 nxt_queue_link_t link; 375 nxt_unit_process_t *process; 376 377 /* of nxt_unit_request_info_impl_t */ 378 nxt_queue_t awaiting_req; 379 380 int ready; 381 382 void *queue; 383 384 int from_socket; 385 nxt_unit_read_buf_t *socket_rbuf; 386 }; 387 388 389 struct nxt_unit_process_s { 390 pid_t pid; 391 392 nxt_queue_t ports; /* of nxt_unit_port_impl_t */ 393 394 nxt_unit_impl_t *lib; 395 396 nxt_atomic_t use_count; 397 398 uint32_t next_port_id; 399 }; 400 401 402 /* Explicitly using 32 bit types to avoid possible alignment. */ 403 typedef struct { 404 int32_t pid; 405 uint32_t id; 406 } nxt_unit_port_hash_id_t; 407 408 409 nxt_unit_ctx_t * 410 nxt_unit_init(nxt_unit_init_t *init) 411 { 412 int rc, queue_fd; 413 void *mem; 414 uint32_t ready_stream, shm_limit; 415 nxt_unit_ctx_t *ctx; 416 nxt_unit_impl_t *lib; 417 nxt_unit_port_t ready_port, router_port, read_port; 418 419 lib = nxt_unit_create(init); 420 if (nxt_slow_path(lib == NULL)) { 421 return NULL; 422 } 423 424 queue_fd = -1; 425 mem = MAP_FAILED; 426 427 if (init->ready_port.id.pid != 0 428 && init->ready_stream != 0 429 && init->read_port.id.pid != 0) 430 { 431 ready_port = init->ready_port; 432 ready_stream = init->ready_stream; 433 router_port = init->router_port; 434 read_port = init->read_port; 435 lib->log_fd = init->log_fd; 436 437 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 438 ready_port.id.id); 439 nxt_unit_port_id_init(&router_port.id, router_port.id.pid, 440 router_port.id.id); 441 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 442 read_port.id.id); 443 444 } else { 445 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, 446 &lib->log_fd, &ready_stream, &shm_limit); 447 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 448 goto fail; 449 } 450 451 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) 452 / PORT_MMAP_DATA_SIZE; 453 } 454 455 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { 456 lib->shm_mmap_limit = 1; 457 } 458 459 lib->pid = read_port.id.pid; 460 461 ctx = &lib->main_ctx.ctx; 462 463 rc = nxt_unit_fd_blocking(router_port.out_fd); 464 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 465 goto fail; 466 } 467 468 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); 469 if (nxt_slow_path(lib->router_port == NULL)) { 470 nxt_unit_alert(NULL, "failed to add router_port"); 471 472 goto fail; 473 } 474 475 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); 476 if (nxt_slow_path(queue_fd == -1)) { 477 goto fail; 478 } 479 480 mem = mmap(NULL, sizeof(nxt_port_queue_t), 481 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 482 if (nxt_slow_path(mem == MAP_FAILED)) { 483 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 484 strerror(errno), errno); 485 486 goto fail; 487 } 488 489 nxt_port_queue_init(mem); 490 491 rc = nxt_unit_fd_blocking(read_port.in_fd); 492 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 493 goto fail; 494 } 495 496 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); 497 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { 498 nxt_unit_alert(NULL, "failed to add read_port"); 499 500 goto fail; 501 } 502 503 rc = nxt_unit_fd_blocking(ready_port.out_fd); 504 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 505 goto fail; 506 } 507 508 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); 509 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 510 nxt_unit_alert(NULL, "failed to send READY message"); 511 512 goto fail; 513 } 514 515 nxt_unit_close(ready_port.out_fd); 516 nxt_unit_close(queue_fd); 517 518 return ctx; 519 520 fail: 521 522 if (mem != MAP_FAILED) { 523 munmap(mem, sizeof(nxt_port_queue_t)); 524 } 525 526 if (queue_fd != -1) { 527 nxt_unit_close(queue_fd); 528 } 529 530 nxt_unit_ctx_release(&lib->main_ctx.ctx); 531 532 return NULL; 533 } 534 535 536 static nxt_unit_impl_t * 537 nxt_unit_create(nxt_unit_init_t *init) 538 { 539 int rc; 540 nxt_unit_impl_t *lib; 541 nxt_unit_callbacks_t *cb; 542 543 lib = nxt_unit_malloc(NULL, 544 sizeof(nxt_unit_impl_t) + init->request_data_size); 545 if (nxt_slow_path(lib == NULL)) { 546 nxt_unit_alert(NULL, "failed to allocate unit struct"); 547 548 return NULL; 549 } 550 551 rc = pthread_mutex_init(&lib->mutex, NULL); 552 if (nxt_slow_path(rc != 0)) { 553 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 554 555 goto fail; 556 } 557 558 lib->unit.data = init->data; 559 lib->callbacks = init->callbacks; 560 561 lib->request_data_size = init->request_data_size; 562 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) 563 / PORT_MMAP_DATA_SIZE; 564 565 lib->processes.slot = NULL; 566 lib->ports.slot = NULL; 567 568 lib->log_fd = STDERR_FILENO; 569 570 nxt_queue_init(&lib->contexts); 571 572 lib->use_count = 0; 573 lib->router_port = NULL; 574 lib->shared_port = NULL; 575 576 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 577 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 578 pthread_mutex_destroy(&lib->mutex); 579 goto fail; 580 } 581 582 cb = &lib->callbacks; 583 584 if (cb->request_handler == NULL) { 585 nxt_unit_alert(NULL, "request_handler is NULL"); 586 587 pthread_mutex_destroy(&lib->mutex); 588 goto fail; 589 } 590 591 nxt_unit_mmaps_init(&lib->incoming); 592 nxt_unit_mmaps_init(&lib->outgoing); 593 594 return lib; 595 596 fail: 597 598 nxt_unit_free(NULL, lib); 599 600 return NULL; 601 } 602 603 604 static int 605 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 606 void *data) 607 { 608 int rc; 609 610 ctx_impl->ctx.data = data; 611 ctx_impl->ctx.unit = &lib->unit; 612 613 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 614 if (nxt_slow_path(rc != 0)) { 615 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 616 617 return NXT_UNIT_ERROR; 618 } 619 620 nxt_unit_lib_use(lib); 621 622 pthread_mutex_lock(&lib->mutex); 623 624 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 625 626 pthread_mutex_unlock(&lib->mutex); 627 628 ctx_impl->use_count = 1; 629 ctx_impl->wait_items = 0; 630 ctx_impl->online = 1; 631 ctx_impl->ready = 0; 632 633 nxt_queue_init(&ctx_impl->free_req); 634 nxt_queue_init(&ctx_impl->free_ws); 635 nxt_queue_init(&ctx_impl->active_req); 636 nxt_queue_init(&ctx_impl->ready_req); 637 nxt_queue_init(&ctx_impl->pending_rbuf); 638 nxt_queue_init(&ctx_impl->free_rbuf); 639 640 ctx_impl->free_buf = NULL; 641 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 642 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 643 644 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 645 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); 646 647 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; 648 649 ctx_impl->req.req.ctx = &ctx_impl->ctx; 650 ctx_impl->req.req.unit = &lib->unit; 651 652 ctx_impl->read_port = NULL; 653 ctx_impl->requests.slot = 0; 654 655 return NXT_UNIT_OK; 656 } 657 658 659 nxt_inline void 660 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) 661 { 662 nxt_unit_ctx_impl_t *ctx_impl; 663 664 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 665 666 nxt_atomic_fetch_add(&ctx_impl->use_count, 1); 667 } 668 669 670 nxt_inline void 671 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) 672 { 673 long c; 674 nxt_unit_ctx_impl_t *ctx_impl; 675 676 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 677 678 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); 679 680 if (c == 1) { 681 nxt_unit_ctx_free(ctx_impl); 682 } 683 } 684 685 686 nxt_inline void 687 nxt_unit_lib_use(nxt_unit_impl_t *lib) 688 { 689 nxt_atomic_fetch_add(&lib->use_count, 1); 690 } 691 692 693 nxt_inline void 694 nxt_unit_lib_release(nxt_unit_impl_t *lib) 695 { 696 long c; 697 nxt_unit_process_t *process; 698 699 c = nxt_atomic_fetch_add(&lib->use_count, -1); 700 701 if (c == 1) { 702 for ( ;; ) { 703 pthread_mutex_lock(&lib->mutex); 704 705 process = nxt_unit_process_pop_first(lib); 706 if (process == NULL) { 707 pthread_mutex_unlock(&lib->mutex); 708 709 break; 710 } 711 712 nxt_unit_remove_process(lib, process); 713 } 714 715 pthread_mutex_destroy(&lib->mutex); 716 717 if (nxt_fast_path(lib->router_port != NULL)) { 718 nxt_unit_port_release(lib->router_port); 719 } 720 721 if (nxt_fast_path(lib->shared_port != NULL)) { 722 nxt_unit_port_release(lib->shared_port); 723 } 724 725 nxt_unit_mmaps_destroy(&lib->incoming); 726 nxt_unit_mmaps_destroy(&lib->outgoing); 727 728 nxt_unit_free(NULL, lib); 729 } 730 } 731 732 733 nxt_inline void 734 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 735 nxt_unit_mmap_buf_t *mmap_buf) 736 { 737 mmap_buf->next = *head; 738 739 if (mmap_buf->next != NULL) { 740 mmap_buf->next->prev = &mmap_buf->next; 741 } 742 743 *head = mmap_buf; 744 mmap_buf->prev = head; 745 } 746 747 748 nxt_inline void 749 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 750 nxt_unit_mmap_buf_t *mmap_buf) 751 { 752 while (*prev != NULL) { 753 prev = &(*prev)->next; 754 } 755 756 nxt_unit_mmap_buf_insert(prev, mmap_buf); 757 } 758 759 760 nxt_inline void 761 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) 762 { 763 nxt_unit_mmap_buf_t **prev; 764 765 prev = mmap_buf->prev; 766 767 if (mmap_buf->next != NULL) { 768 mmap_buf->next->prev = prev; 769 } 770 771 if (prev != NULL) { 772 *prev = mmap_buf->next; 773 } 774 } 775 776 777 static int 778 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, 779 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, 780 uint32_t *shm_limit) 781 { 782 int rc; 783 int ready_fd, router_fd, read_in_fd, read_out_fd; 784 char *unit_init, *version_end; 785 long version_length; 786 int64_t ready_pid, router_pid, read_pid; 787 uint32_t ready_stream, router_id, ready_id, read_id; 788 789 unit_init = getenv(NXT_UNIT_INIT_ENV); 790 if (nxt_slow_path(unit_init == NULL)) { 791 nxt_unit_alert(NULL, "%s is not in the current environment", 792 NXT_UNIT_INIT_ENV); 793 794 return NXT_UNIT_ERROR; 795 } 796 797 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 798 799 version_length = nxt_length(NXT_VERSION); 800 801 version_end = strchr(unit_init, ';'); 802 if (version_end == NULL 803 || version_end - unit_init != version_length 804 || memcmp(unit_init, NXT_VERSION, version_length) != 0) 805 { 806 nxt_unit_alert(NULL, "version check error"); 807 808 return NXT_UNIT_ERROR; 809 } 810 811 rc = sscanf(version_end + 1, 812 "%"PRIu32";" 813 "%"PRId64",%"PRIu32",%d;" 814 "%"PRId64",%"PRIu32",%d;" 815 "%"PRId64",%"PRIu32",%d,%d;" 816 "%d,%"PRIu32, 817 &ready_stream, 818 &ready_pid, &ready_id, &ready_fd, 819 &router_pid, &router_id, &router_fd, 820 &read_pid, &read_id, &read_in_fd, &read_out_fd, 821 log_fd, shm_limit); 822 823 if (nxt_slow_path(rc != 13)) { 824 nxt_unit_alert(NULL, "failed to scan variables: %d", rc); 825 826 return NXT_UNIT_ERROR; 827 } 828 829 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 830 831 ready_port->in_fd = -1; 832 ready_port->out_fd = ready_fd; 833 ready_port->data = NULL; 834 835 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); 836 837 router_port->in_fd = -1; 838 router_port->out_fd = router_fd; 839 router_port->data = NULL; 840 841 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 842 843 read_port->in_fd = read_in_fd; 844 read_port->out_fd = read_out_fd; 845 read_port->data = NULL; 846 847 *stream = ready_stream; 848 849 return NXT_UNIT_OK; 850 } 851 852 853 static int 854 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) 855 { 856 ssize_t res; 857 nxt_port_msg_t msg; 858 nxt_unit_impl_t *lib; 859 860 union { 861 struct cmsghdr cm; 862 char space[CMSG_SPACE(sizeof(int))]; 863 } cmsg; 864 865 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 866 867 msg.stream = stream; 868 msg.pid = lib->pid; 869 msg.reply_port = 0; 870 msg.type = _NXT_PORT_MSG_PROCESS_READY; 871 msg.last = 1; 872 msg.mmap = 0; 873 msg.nf = 0; 874 msg.mf = 0; 875 msg.tracking = 0; 876 877 memset(&cmsg, 0, sizeof(cmsg)); 878 879 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 880 cmsg.cm.cmsg_level = SOL_SOCKET; 881 cmsg.cm.cmsg_type = SCM_RIGHTS; 882 883 /* 884 * memcpy() is used instead of simple 885 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 886 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 887 * dereferencing type-punned pointer will break strict-aliasing rules 888 * 889 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 890 * in the same simple assignment as in the code above. 891 */ 892 memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); 893 894 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), 895 &cmsg, sizeof(cmsg)); 896 if (res != sizeof(msg)) { 897 return NXT_UNIT_ERROR; 898 } 899 900 return NXT_UNIT_OK; 901 } 902 903 904 static int 905 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) 906 { 907 int rc; 908 pid_t pid; 909 struct cmsghdr *cm; 910 nxt_port_msg_t *port_msg; 911 nxt_unit_impl_t *lib; 912 nxt_unit_recv_msg_t recv_msg; 913 914 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 915 916 recv_msg.fd[0] = -1; 917 recv_msg.fd[1] = -1; 918 port_msg = (nxt_port_msg_t *) rbuf->buf; 919 cm = (struct cmsghdr *) rbuf->oob; 920 921 if (cm->cmsg_level == SOL_SOCKET 922 && cm->cmsg_type == SCM_RIGHTS) 923 { 924 if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { 925 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 926 } 927 928 if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { 929 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); 930 } 931 } 932 933 recv_msg.incoming_buf = NULL; 934 935 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 936 if (nxt_slow_path(rbuf->size == 0)) { 937 nxt_unit_debug(ctx, "read port closed"); 938 939 nxt_unit_quit(ctx); 940 rc = NXT_UNIT_OK; 941 goto done; 942 } 943 944 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); 945 946 rc = NXT_UNIT_ERROR; 947 goto done; 948 } 949 950 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", 951 port_msg->stream, (int) port_msg->type, 952 recv_msg.fd[0], recv_msg.fd[1]); 953 954 recv_msg.stream = port_msg->stream; 955 recv_msg.pid = port_msg->pid; 956 recv_msg.reply_port = port_msg->reply_port; 957 recv_msg.last = port_msg->last; 958 recv_msg.mmap = port_msg->mmap; 959 960 recv_msg.start = port_msg + 1; 961 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); 962 963 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 964 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)", 965 port_msg->stream, (int) port_msg->type); 966 rc = NXT_UNIT_ERROR; 967 goto done; 968 } 969 970 /* Fragmentation is unsupported. */ 971 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 972 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)", 973 port_msg->stream, (int) port_msg->type); 974 rc = NXT_UNIT_ERROR; 975 goto done; 976 } 977 978 if (port_msg->mmap) { 979 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); 980 981 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 982 if (rc == NXT_UNIT_AGAIN) { 983 recv_msg.fd[0] = -1; 984 recv_msg.fd[1] = -1; 985 } 986 987 goto done; 988 } 989 } 990 991 switch (port_msg->type) { 992 993 case _NXT_PORT_MSG_RPC_READY: 994 rc = NXT_UNIT_OK; 995 break; 996 997 case _NXT_PORT_MSG_QUIT: 998 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 999 1000 nxt_unit_quit(ctx); 1001 rc = NXT_UNIT_OK; 1002 break; 1003 1004 case _NXT_PORT_MSG_NEW_PORT: 1005 rc = nxt_unit_process_new_port(ctx, &recv_msg); 1006 break; 1007 1008 case _NXT_PORT_MSG_PORT_ACK: 1009 rc = nxt_unit_ctx_ready(ctx); 1010 break; 1011 1012 case _NXT_PORT_MSG_CHANGE_FILE: 1013 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 1014 port_msg->stream, recv_msg.fd[0]); 1015 1016 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { 1017 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", 1018 port_msg->stream, recv_msg.fd[0], lib->log_fd, 1019 strerror(errno), errno); 1020 1021 rc = NXT_UNIT_ERROR; 1022 goto done; 1023 } 1024 1025 rc = NXT_UNIT_OK; 1026 break; 1027 1028 case _NXT_PORT_MSG_MMAP: 1029 if (nxt_slow_path(recv_msg.fd[0] < 0)) { 1030 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 1031 port_msg->stream, recv_msg.fd[0]); 1032 1033 rc = NXT_UNIT_ERROR; 1034 goto done; 1035 } 1036 1037 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); 1038 break; 1039 1040 case _NXT_PORT_MSG_REQ_HEADERS: 1041 rc = nxt_unit_process_req_headers(ctx, &recv_msg); 1042 break; 1043 1044 case _NXT_PORT_MSG_REQ_BODY: 1045 rc = nxt_unit_process_req_body(ctx, &recv_msg); 1046 break; 1047 1048 case _NXT_PORT_MSG_WEBSOCKET: 1049 rc = nxt_unit_process_websocket(ctx, &recv_msg); 1050 break; 1051 1052 case _NXT_PORT_MSG_REMOVE_PID: 1053 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 1054 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " 1055 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 1056 (int) sizeof(pid)); 1057 1058 rc = NXT_UNIT_ERROR; 1059 goto done; 1060 } 1061 1062 memcpy(&pid, recv_msg.start, sizeof(pid)); 1063 1064 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 1065 port_msg->stream, (int) pid); 1066 1067 nxt_unit_remove_pid(lib, pid); 1068 1069 rc = NXT_UNIT_OK; 1070 break; 1071 1072 case _NXT_PORT_MSG_SHM_ACK: 1073 rc = nxt_unit_process_shm_ack(ctx); 1074 break; 1075 1076 default: 1077 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d", 1078 port_msg->stream, (int) port_msg->type); 1079 1080 rc = NXT_UNIT_ERROR; 1081 goto done; 1082 } 1083 1084 done: 1085 1086 if (recv_msg.fd[0] != -1) { 1087 nxt_unit_close(recv_msg.fd[0]); 1088 } 1089 1090 if (recv_msg.fd[1] != -1) { 1091 nxt_unit_close(recv_msg.fd[1]); 1092 } 1093 1094 while (recv_msg.incoming_buf != NULL) { 1095 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 1096 } 1097 1098 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { 1099 #if (NXT_DEBUG) 1100 memset(rbuf->buf, 0xAC, rbuf->size); 1101 #endif 1102 nxt_unit_read_buf_release(ctx, rbuf); 1103 } 1104 1105 return rc; 1106 } 1107 1108 1109 static int 1110 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1111 { 1112 void *mem; 1113 nxt_unit_impl_t *lib; 1114 nxt_unit_port_t new_port, *port; 1115 nxt_port_msg_new_port_t *new_port_msg; 1116 1117 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 1118 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 1119 "invalid message size (%d)", 1120 recv_msg->stream, (int) recv_msg->size); 1121 1122 return NXT_UNIT_ERROR; 1123 } 1124 1125 if (nxt_slow_path(recv_msg->fd[0] < 0)) { 1126 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 1127 recv_msg->stream, recv_msg->fd[0]); 1128 1129 return NXT_UNIT_ERROR; 1130 } 1131 1132 new_port_msg = recv_msg->start; 1133 1134 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d", 1135 recv_msg->stream, (int) new_port_msg->pid, 1136 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); 1137 1138 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1139 1140 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { 1141 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); 1142 1143 new_port.in_fd = recv_msg->fd[0]; 1144 new_port.out_fd = -1; 1145 1146 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, 1147 MAP_SHARED, recv_msg->fd[1], 0); 1148 1149 } else { 1150 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) 1151 != NXT_UNIT_OK)) 1152 { 1153 return NXT_UNIT_ERROR; 1154 } 1155 1156 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 1157 new_port_msg->id); 1158 1159 new_port.in_fd = -1; 1160 new_port.out_fd = recv_msg->fd[0]; 1161 1162 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, 1163 MAP_SHARED, recv_msg->fd[1], 0); 1164 } 1165 1166 if (nxt_slow_path(mem == MAP_FAILED)) { 1167 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], 1168 strerror(errno), errno); 1169 1170 return NXT_UNIT_ERROR; 1171 } 1172 1173 new_port.data = NULL; 1174 1175 recv_msg->fd[0] = -1; 1176 1177 port = nxt_unit_add_port(ctx, &new_port, mem); 1178 if (nxt_slow_path(port == NULL)) { 1179 return NXT_UNIT_ERROR; 1180 } 1181 1182 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { 1183 lib->shared_port = port; 1184 1185 return nxt_unit_ctx_ready(ctx); 1186 } 1187 1188 nxt_unit_port_release(port); 1189 1190 return NXT_UNIT_OK; 1191 } 1192 1193 1194 static int 1195 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) 1196 { 1197 nxt_unit_impl_t *lib; 1198 nxt_unit_ctx_impl_t *ctx_impl; 1199 1200 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1201 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1202 1203 ctx_impl->ready = 1; 1204 1205 if (lib->callbacks.ready_handler) { 1206 return lib->callbacks.ready_handler(ctx); 1207 } 1208 1209 return NXT_UNIT_OK; 1210 } 1211 1212 1213 static int 1214 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1215 { 1216 int res; 1217 nxt_unit_impl_t *lib; 1218 nxt_unit_port_id_t port_id; 1219 nxt_unit_request_t *r; 1220 nxt_unit_mmap_buf_t *b; 1221 nxt_unit_request_info_t *req; 1222 nxt_unit_request_info_impl_t *req_impl; 1223 1224 if (nxt_slow_path(recv_msg->mmap == 0)) { 1225 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 1226 recv_msg->stream); 1227 1228 return NXT_UNIT_ERROR; 1229 } 1230 1231 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 1232 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 1233 "%d expected", recv_msg->stream, (int) recv_msg->size, 1234 (int) sizeof(nxt_unit_request_t)); 1235 1236 return NXT_UNIT_ERROR; 1237 } 1238 1239 req_impl = nxt_unit_request_info_get(ctx); 1240 if (nxt_slow_path(req_impl == NULL)) { 1241 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 1242 recv_msg->stream); 1243 1244 return NXT_UNIT_ERROR; 1245 } 1246 1247 req = &req_impl->req; 1248 1249 req->request = recv_msg->start; 1250 1251 b = recv_msg->incoming_buf; 1252 1253 req->request_buf = &b->buf; 1254 req->response = NULL; 1255 req->response_buf = NULL; 1256 1257 r = req->request; 1258 1259 req->content_length = r->content_length; 1260 1261 req->content_buf = req->request_buf; 1262 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 1263 1264 req_impl->stream = recv_msg->stream; 1265 1266 req_impl->outgoing_buf = NULL; 1267 1268 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1269 b->req = req; 1270 } 1271 1272 /* "Move" incoming buffer list to req_impl. */ 1273 req_impl->incoming_buf = recv_msg->incoming_buf; 1274 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 1275 recv_msg->incoming_buf = NULL; 1276 1277 req->content_fd = recv_msg->fd[0]; 1278 recv_msg->fd[0] = -1; 1279 1280 req->response_max_fields = 0; 1281 req_impl->state = NXT_UNIT_RS_START; 1282 req_impl->websocket = 0; 1283 req_impl->in_hash = 0; 1284 1285 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 1286 (int) r->method_length, 1287 (char *) nxt_unit_sptr_get(&r->method), 1288 (int) r->target_length, 1289 (char *) nxt_unit_sptr_get(&r->target), 1290 (int) r->content_length); 1291 1292 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); 1293 1294 res = nxt_unit_request_check_response_port(req, &port_id); 1295 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1296 return NXT_UNIT_ERROR; 1297 } 1298 1299 if (nxt_fast_path(res == NXT_UNIT_OK)) { 1300 res = nxt_unit_send_req_headers_ack(req); 1301 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1302 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1303 1304 return NXT_UNIT_ERROR; 1305 } 1306 1307 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1308 1309 if (req->content_length 1310 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 1311 { 1312 res = nxt_unit_request_hash_add(ctx, req); 1313 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1314 nxt_unit_req_warn(req, "failed to add request to hash"); 1315 1316 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1317 1318 return NXT_UNIT_ERROR; 1319 } 1320 1321 /* 1322 * If application have separate data handler, we may start 1323 * request processing and process data when it is arrived. 1324 */ 1325 if (lib->callbacks.data_handler == NULL) { 1326 return NXT_UNIT_OK; 1327 } 1328 } 1329 1330 lib->callbacks.request_handler(req); 1331 } 1332 1333 return NXT_UNIT_OK; 1334 } 1335 1336 1337 static int 1338 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1339 { 1340 uint64_t l; 1341 nxt_unit_impl_t *lib; 1342 nxt_unit_mmap_buf_t *b; 1343 nxt_unit_request_info_t *req; 1344 1345 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1346 if (req == NULL) { 1347 return NXT_UNIT_OK; 1348 } 1349 1350 l = req->content_buf->end - req->content_buf->free; 1351 1352 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1353 b->req = req; 1354 l += b->buf.end - b->buf.free; 1355 } 1356 1357 if (recv_msg->incoming_buf != NULL) { 1358 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); 1359 1360 /* "Move" incoming buffer list to req_impl. */ 1361 nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf); 1362 recv_msg->incoming_buf = NULL; 1363 } 1364 1365 req->content_fd = recv_msg->fd[0]; 1366 recv_msg->fd[0] = -1; 1367 1368 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1369 1370 if (lib->callbacks.data_handler != NULL) { 1371 lib->callbacks.data_handler(req); 1372 1373 return NXT_UNIT_OK; 1374 } 1375 1376 if (req->content_fd != -1 || l == req->content_length) { 1377 lib->callbacks.request_handler(req); 1378 } 1379 1380 return NXT_UNIT_OK; 1381 } 1382 1383 1384 static int 1385 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 1386 nxt_unit_port_id_t *port_id) 1387 { 1388 int res; 1389 nxt_unit_ctx_t *ctx; 1390 nxt_unit_impl_t *lib; 1391 nxt_unit_port_t *port; 1392 nxt_unit_process_t *process; 1393 nxt_unit_ctx_impl_t *ctx_impl; 1394 nxt_unit_port_impl_t *port_impl; 1395 nxt_unit_request_info_impl_t *req_impl; 1396 1397 ctx = req->ctx; 1398 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1399 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1400 1401 pthread_mutex_lock(&lib->mutex); 1402 1403 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 1404 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 1405 1406 if (nxt_fast_path(port != NULL)) { 1407 req->response_port = port; 1408 1409 if (nxt_fast_path(port_impl->ready)) { 1410 pthread_mutex_unlock(&lib->mutex); 1411 1412 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", 1413 (int) port->id.pid, (int) port->id.id); 1414 1415 return NXT_UNIT_OK; 1416 } 1417 1418 nxt_unit_debug(ctx, "check_response_port: " 1419 "port{%d,%d} already requested", 1420 (int) port->id.pid, (int) port->id.id); 1421 1422 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1423 1424 nxt_queue_insert_tail(&port_impl->awaiting_req, 1425 &req_impl->port_wait_link); 1426 1427 pthread_mutex_unlock(&lib->mutex); 1428 1429 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1430 1431 return NXT_UNIT_AGAIN; 1432 } 1433 1434 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 1435 if (nxt_slow_path(port_impl == NULL)) { 1436 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", 1437 (int) sizeof(nxt_unit_port_impl_t)); 1438 1439 pthread_mutex_unlock(&lib->mutex); 1440 1441 return NXT_UNIT_ERROR; 1442 } 1443 1444 port = &port_impl->port; 1445 1446 port->id = *port_id; 1447 port->in_fd = -1; 1448 port->out_fd = -1; 1449 port->data = NULL; 1450 1451 res = nxt_unit_port_hash_add(&lib->ports, port); 1452 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1453 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", 1454 port->id.pid, port->id.id); 1455 1456 pthread_mutex_unlock(&lib->mutex); 1457 1458 nxt_unit_free(ctx, port); 1459 1460 return NXT_UNIT_ERROR; 1461 } 1462 1463 process = nxt_unit_process_find(lib, port_id->pid, 0); 1464 if (nxt_slow_path(process == NULL)) { 1465 nxt_unit_alert(ctx, "check_response_port: process %d not found", 1466 port->id.pid); 1467 1468 nxt_unit_port_hash_find(&lib->ports, port_id, 1); 1469 1470 pthread_mutex_unlock(&lib->mutex); 1471 1472 nxt_unit_free(ctx, port); 1473 1474 return NXT_UNIT_ERROR; 1475 } 1476 1477 nxt_queue_insert_tail(&process->ports, &port_impl->link); 1478 1479 port_impl->process = process; 1480 port_impl->queue = NULL; 1481 port_impl->from_socket = 0; 1482 port_impl->socket_rbuf = NULL; 1483 1484 nxt_queue_init(&port_impl->awaiting_req); 1485 1486 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1487 1488 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); 1489 1490 port_impl->use_count = 2; 1491 port_impl->ready = 0; 1492 1493 req->response_port = port; 1494 1495 pthread_mutex_unlock(&lib->mutex); 1496 1497 res = nxt_unit_get_port(ctx, port_id); 1498 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1499 return NXT_UNIT_ERROR; 1500 } 1501 1502 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1503 1504 return NXT_UNIT_AGAIN; 1505 } 1506 1507 1508 static int 1509 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) 1510 { 1511 ssize_t res; 1512 nxt_port_msg_t msg; 1513 nxt_unit_impl_t *lib; 1514 nxt_unit_ctx_impl_t *ctx_impl; 1515 nxt_unit_request_info_impl_t *req_impl; 1516 1517 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 1518 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1519 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1520 1521 memset(&msg, 0, sizeof(nxt_port_msg_t)); 1522 1523 msg.stream = req_impl->stream; 1524 msg.pid = lib->pid; 1525 msg.reply_port = ctx_impl->read_port->id.id; 1526 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; 1527 1528 res = nxt_unit_port_send(req->ctx, req->response_port, 1529 &msg, sizeof(msg), NULL, 0); 1530 if (nxt_slow_path(res != sizeof(msg))) { 1531 return NXT_UNIT_ERROR; 1532 } 1533 1534 return NXT_UNIT_OK; 1535 } 1536 1537 1538 static int 1539 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1540 { 1541 size_t hsize; 1542 nxt_unit_impl_t *lib; 1543 nxt_unit_mmap_buf_t *b; 1544 nxt_unit_callbacks_t *cb; 1545 nxt_unit_request_info_t *req; 1546 nxt_unit_request_info_impl_t *req_impl; 1547 nxt_unit_websocket_frame_impl_t *ws_impl; 1548 1549 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1550 if (nxt_slow_path(req == NULL)) { 1551 return NXT_UNIT_OK; 1552 } 1553 1554 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1555 1556 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1557 cb = &lib->callbacks; 1558 1559 if (cb->websocket_handler && recv_msg->size >= 2) { 1560 ws_impl = nxt_unit_websocket_frame_get(ctx); 1561 if (nxt_slow_path(ws_impl == NULL)) { 1562 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 1563 req_impl->stream); 1564 1565 return NXT_UNIT_ERROR; 1566 } 1567 1568 ws_impl->ws.req = req; 1569 1570 ws_impl->buf = NULL; 1571 1572 if (recv_msg->mmap) { 1573 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1574 b->req = req; 1575 } 1576 1577 /* "Move" incoming buffer list to ws_impl. */ 1578 ws_impl->buf = recv_msg->incoming_buf; 1579 ws_impl->buf->prev = &ws_impl->buf; 1580 recv_msg->incoming_buf = NULL; 1581 1582 b = ws_impl->buf; 1583 1584 } else { 1585 b = nxt_unit_mmap_buf_get(ctx); 1586 if (nxt_slow_path(b == NULL)) { 1587 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 1588 req_impl->stream); 1589 1590 nxt_unit_websocket_frame_release(&ws_impl->ws); 1591 1592 return NXT_UNIT_ERROR; 1593 } 1594 1595 b->req = req; 1596 b->buf.start = recv_msg->start; 1597 b->buf.free = b->buf.start; 1598 b->buf.end = b->buf.start + recv_msg->size; 1599 1600 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 1601 } 1602 1603 ws_impl->ws.header = (void *) b->buf.start; 1604 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1605 ws_impl->ws.header); 1606 1607 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1608 1609 if (ws_impl->ws.header->mask) { 1610 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1611 1612 } else { 1613 ws_impl->ws.mask = NULL; 1614 } 1615 1616 b->buf.free += hsize; 1617 1618 ws_impl->ws.content_buf = &b->buf; 1619 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1620 1621 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1622 "payload_len=%"PRIu64, 1623 ws_impl->ws.header->opcode, 1624 ws_impl->ws.payload_len); 1625 1626 cb->websocket_handler(&ws_impl->ws); 1627 } 1628 1629 if (recv_msg->last) { 1630 req_impl->websocket = 0; 1631 1632 if (cb->close_handler) { 1633 nxt_unit_req_debug(req, "close_handler"); 1634 1635 cb->close_handler(req); 1636 1637 } else { 1638 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1639 } 1640 } 1641 1642 return NXT_UNIT_OK; 1643 } 1644 1645 1646 static int 1647 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) 1648 { 1649 nxt_unit_impl_t *lib; 1650 nxt_unit_callbacks_t *cb; 1651 1652 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1653 cb = &lib->callbacks; 1654 1655 if (cb->shm_ack_handler != NULL) { 1656 cb->shm_ack_handler(ctx); 1657 } 1658 1659 return NXT_UNIT_OK; 1660 } 1661 1662 1663 static nxt_unit_request_info_impl_t * 1664 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1665 { 1666 nxt_unit_impl_t *lib; 1667 nxt_queue_link_t *lnk; 1668 nxt_unit_ctx_impl_t *ctx_impl; 1669 nxt_unit_request_info_impl_t *req_impl; 1670 1671 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1672 1673 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1674 1675 pthread_mutex_lock(&ctx_impl->mutex); 1676 1677 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1678 pthread_mutex_unlock(&ctx_impl->mutex); 1679 1680 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t) 1681 + lib->request_data_size); 1682 if (nxt_slow_path(req_impl == NULL)) { 1683 return NULL; 1684 } 1685 1686 req_impl->req.unit = ctx->unit; 1687 req_impl->req.ctx = ctx; 1688 1689 pthread_mutex_lock(&ctx_impl->mutex); 1690 1691 } else { 1692 lnk = nxt_queue_first(&ctx_impl->free_req); 1693 nxt_queue_remove(lnk); 1694 1695 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1696 } 1697 1698 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1699 1700 pthread_mutex_unlock(&ctx_impl->mutex); 1701 1702 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1703 1704 return req_impl; 1705 } 1706 1707 1708 static void 1709 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1710 { 1711 nxt_unit_ctx_impl_t *ctx_impl; 1712 nxt_unit_request_info_impl_t *req_impl; 1713 1714 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1715 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1716 1717 req->response = NULL; 1718 req->response_buf = NULL; 1719 1720 if (req_impl->in_hash) { 1721 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); 1722 } 1723 1724 req_impl->websocket = 0; 1725 1726 while (req_impl->outgoing_buf != NULL) { 1727 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1728 } 1729 1730 while (req_impl->incoming_buf != NULL) { 1731 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1732 } 1733 1734 if (req->content_fd != -1) { 1735 nxt_unit_close(req->content_fd); 1736 1737 req->content_fd = -1; 1738 } 1739 1740 if (req->response_port != NULL) { 1741 nxt_unit_port_release(req->response_port); 1742 1743 req->response_port = NULL; 1744 } 1745 1746 pthread_mutex_lock(&ctx_impl->mutex); 1747 1748 nxt_queue_remove(&req_impl->link); 1749 1750 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1751 1752 pthread_mutex_unlock(&ctx_impl->mutex); 1753 1754 req_impl->state = NXT_UNIT_RS_RELEASED; 1755 } 1756 1757 1758 static void 1759 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1760 { 1761 nxt_unit_ctx_impl_t *ctx_impl; 1762 1763 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1764 1765 nxt_queue_remove(&req_impl->link); 1766 1767 if (req_impl != &ctx_impl->req) { 1768 nxt_unit_free(&ctx_impl->ctx, req_impl); 1769 } 1770 } 1771 1772 1773 static nxt_unit_websocket_frame_impl_t * 1774 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1775 { 1776 nxt_queue_link_t *lnk; 1777 nxt_unit_ctx_impl_t *ctx_impl; 1778 nxt_unit_websocket_frame_impl_t *ws_impl; 1779 1780 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1781 1782 pthread_mutex_lock(&ctx_impl->mutex); 1783 1784 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1785 pthread_mutex_unlock(&ctx_impl->mutex); 1786 1787 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t)); 1788 if (nxt_slow_path(ws_impl == NULL)) { 1789 return NULL; 1790 } 1791 1792 } else { 1793 lnk = nxt_queue_first(&ctx_impl->free_ws); 1794 nxt_queue_remove(lnk); 1795 1796 pthread_mutex_unlock(&ctx_impl->mutex); 1797 1798 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1799 } 1800 1801 ws_impl->ctx_impl = ctx_impl; 1802 1803 return ws_impl; 1804 } 1805 1806 1807 static void 1808 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1809 { 1810 nxt_unit_websocket_frame_impl_t *ws_impl; 1811 1812 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1813 1814 while (ws_impl->buf != NULL) { 1815 nxt_unit_mmap_buf_free(ws_impl->buf); 1816 } 1817 1818 ws->req = NULL; 1819 1820 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1821 1822 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1823 1824 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1825 } 1826 1827 1828 static void 1829 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 1830 nxt_unit_websocket_frame_impl_t *ws_impl) 1831 { 1832 nxt_queue_remove(&ws_impl->link); 1833 1834 nxt_unit_free(ctx, ws_impl); 1835 } 1836 1837 1838 uint16_t 1839 nxt_unit_field_hash(const char *name, size_t name_length) 1840 { 1841 u_char ch; 1842 uint32_t hash; 1843 const char *p, *end; 1844 1845 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1846 end = name + name_length; 1847 1848 for (p = name; p < end; p++) { 1849 ch = *p; 1850 hash = (hash << 4) + hash + nxt_lowcase(ch); 1851 } 1852 1853 hash = (hash >> 16) ^ hash; 1854 1855 return hash; 1856 } 1857 1858 1859 void 1860 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1861 { 1862 char *name; 1863 uint32_t i, j; 1864 nxt_unit_field_t *fields, f; 1865 nxt_unit_request_t *r; 1866 1867 static nxt_str_t content_length = nxt_string("content-length"); 1868 static nxt_str_t content_type = nxt_string("content-type"); 1869 static nxt_str_t cookie = nxt_string("cookie"); 1870 1871 nxt_unit_req_debug(req, "group_dup_fields"); 1872 1873 r = req->request; 1874 fields = r->fields; 1875 1876 for (i = 0; i < r->fields_count; i++) { 1877 name = nxt_unit_sptr_get(&fields[i].name); 1878 1879 switch (fields[i].hash) { 1880 case NXT_UNIT_HASH_CONTENT_LENGTH: 1881 if (fields[i].name_length == content_length.length 1882 && nxt_unit_memcasecmp(name, content_length.start, 1883 content_length.length) == 0) 1884 { 1885 r->content_length_field = i; 1886 } 1887 1888 break; 1889 1890 case NXT_UNIT_HASH_CONTENT_TYPE: 1891 if (fields[i].name_length == content_type.length 1892 && nxt_unit_memcasecmp(name, content_type.start, 1893 content_type.length) == 0) 1894 { 1895 r->content_type_field = i; 1896 } 1897 1898 break; 1899 1900 case NXT_UNIT_HASH_COOKIE: 1901 if (fields[i].name_length == cookie.length 1902 && nxt_unit_memcasecmp(name, cookie.start, 1903 cookie.length) == 0) 1904 { 1905 r->cookie_field = i; 1906 } 1907 1908 break; 1909 } 1910 1911 for (j = i + 1; j < r->fields_count; j++) { 1912 if (fields[i].hash != fields[j].hash 1913 || fields[i].name_length != fields[j].name_length 1914 || nxt_unit_memcasecmp(name, 1915 nxt_unit_sptr_get(&fields[j].name), 1916 fields[j].name_length) != 0) 1917 { 1918 continue; 1919 } 1920 1921 f = fields[j]; 1922 f.value.offset += (j - (i + 1)) * sizeof(f); 1923 1924 while (j > i + 1) { 1925 fields[j] = fields[j - 1]; 1926 fields[j].name.offset -= sizeof(f); 1927 fields[j].value.offset -= sizeof(f); 1928 j--; 1929 } 1930 1931 fields[j] = f; 1932 1933 /* Assign the same name pointer for further grouping simplicity. */ 1934 nxt_unit_sptr_set(&fields[j].name, name); 1935 1936 i++; 1937 } 1938 } 1939 } 1940 1941 1942 int 1943 nxt_unit_response_init(nxt_unit_request_info_t *req, 1944 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 1945 { 1946 uint32_t buf_size; 1947 nxt_unit_buf_t *buf; 1948 nxt_unit_request_info_impl_t *req_impl; 1949 1950 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1951 1952 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1953 nxt_unit_req_warn(req, "init: response already sent"); 1954 1955 return NXT_UNIT_ERROR; 1956 } 1957 1958 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 1959 (int) max_fields_count, (int) max_fields_size); 1960 1961 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 1962 nxt_unit_req_debug(req, "duplicate response init"); 1963 } 1964 1965 /* 1966 * Each field name and value 0-terminated by libunit, 1967 * this is the reason of '+ 2' below. 1968 */ 1969 buf_size = sizeof(nxt_unit_response_t) 1970 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 1971 + max_fields_size; 1972 1973 if (nxt_slow_path(req->response_buf != NULL)) { 1974 buf = req->response_buf; 1975 1976 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 1977 goto init_response; 1978 } 1979 1980 nxt_unit_buf_free(buf); 1981 1982 req->response_buf = NULL; 1983 req->response = NULL; 1984 req->response_max_fields = 0; 1985 1986 req_impl->state = NXT_UNIT_RS_START; 1987 } 1988 1989 buf = nxt_unit_response_buf_alloc(req, buf_size); 1990 if (nxt_slow_path(buf == NULL)) { 1991 return NXT_UNIT_ERROR; 1992 } 1993 1994 init_response: 1995 1996 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 1997 1998 req->response_buf = buf; 1999 2000 req->response = (nxt_unit_response_t *) buf->start; 2001 req->response->status = status; 2002 2003 buf->free = buf->start + sizeof(nxt_unit_response_t) 2004 + max_fields_count * sizeof(nxt_unit_field_t); 2005 2006 req->response_max_fields = max_fields_count; 2007 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 2008 2009 return NXT_UNIT_OK; 2010 } 2011 2012 2013 int 2014 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 2015 uint32_t max_fields_count, uint32_t max_fields_size) 2016 { 2017 char *p; 2018 uint32_t i, buf_size; 2019 nxt_unit_buf_t *buf; 2020 nxt_unit_field_t *f, *src; 2021 nxt_unit_response_t *resp; 2022 nxt_unit_request_info_impl_t *req_impl; 2023 2024 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2025 2026 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2027 nxt_unit_req_warn(req, "realloc: response not init"); 2028 2029 return NXT_UNIT_ERROR; 2030 } 2031 2032 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2033 nxt_unit_req_warn(req, "realloc: response already sent"); 2034 2035 return NXT_UNIT_ERROR; 2036 } 2037 2038 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 2039 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 2040 2041 return NXT_UNIT_ERROR; 2042 } 2043 2044 /* 2045 * Each field name and value 0-terminated by libunit, 2046 * this is the reason of '+ 2' below. 2047 */ 2048 buf_size = sizeof(nxt_unit_response_t) 2049 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 2050 + max_fields_size; 2051 2052 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 2053 2054 buf = nxt_unit_response_buf_alloc(req, buf_size); 2055 if (nxt_slow_path(buf == NULL)) { 2056 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 2057 return NXT_UNIT_ERROR; 2058 } 2059 2060 resp = (nxt_unit_response_t *) buf->start; 2061 2062 memset(resp, 0, sizeof(nxt_unit_response_t)); 2063 2064 resp->status = req->response->status; 2065 resp->content_length = req->response->content_length; 2066 2067 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 2068 f = resp->fields; 2069 2070 for (i = 0; i < req->response->fields_count; i++) { 2071 src = req->response->fields + i; 2072 2073 if (nxt_slow_path(src->skip != 0)) { 2074 continue; 2075 } 2076 2077 if (nxt_slow_path(src->name_length + src->value_length + 2 2078 > (uint32_t) (buf->end - p))) 2079 { 2080 nxt_unit_req_warn(req, "realloc: not enough space for field" 2081 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 2082 i, src, src->name_length, src->value_length); 2083 2084 goto fail; 2085 } 2086 2087 nxt_unit_sptr_set(&f->name, p); 2088 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 2089 *p++ = '\0'; 2090 2091 nxt_unit_sptr_set(&f->value, p); 2092 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 2093 *p++ = '\0'; 2094 2095 f->hash = src->hash; 2096 f->skip = 0; 2097 f->name_length = src->name_length; 2098 f->value_length = src->value_length; 2099 2100 resp->fields_count++; 2101 f++; 2102 } 2103 2104 if (req->response->piggyback_content_length > 0) { 2105 if (nxt_slow_path(req->response->piggyback_content_length 2106 > (uint32_t) (buf->end - p))) 2107 { 2108 nxt_unit_req_warn(req, "realloc: not enought space for content" 2109 " #%"PRIu32", %"PRIu32" required", 2110 i, req->response->piggyback_content_length); 2111 2112 goto fail; 2113 } 2114 2115 resp->piggyback_content_length = 2116 req->response->piggyback_content_length; 2117 2118 nxt_unit_sptr_set(&resp->piggyback_content, p); 2119 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 2120 req->response->piggyback_content_length); 2121 } 2122 2123 buf->free = p; 2124 2125 nxt_unit_buf_free(req->response_buf); 2126 2127 req->response = resp; 2128 req->response_buf = buf; 2129 req->response_max_fields = max_fields_count; 2130 2131 return NXT_UNIT_OK; 2132 2133 fail: 2134 2135 nxt_unit_buf_free(buf); 2136 2137 return NXT_UNIT_ERROR; 2138 } 2139 2140 2141 int 2142 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 2143 { 2144 nxt_unit_request_info_impl_t *req_impl; 2145 2146 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2147 2148 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 2149 } 2150 2151 2152 int 2153 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 2154 const char *name, uint8_t name_length, 2155 const char *value, uint32_t value_length) 2156 { 2157 nxt_unit_buf_t *buf; 2158 nxt_unit_field_t *f; 2159 nxt_unit_response_t *resp; 2160 nxt_unit_request_info_impl_t *req_impl; 2161 2162 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2163 2164 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 2165 nxt_unit_req_warn(req, "add_field: response not initialized or " 2166 "already sent"); 2167 2168 return NXT_UNIT_ERROR; 2169 } 2170 2171 resp = req->response; 2172 2173 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 2174 nxt_unit_req_warn(req, "add_field: too many response fields"); 2175 2176 return NXT_UNIT_ERROR; 2177 } 2178 2179 buf = req->response_buf; 2180 2181 if (nxt_slow_path(name_length + value_length + 2 2182 > (uint32_t) (buf->end - buf->free))) 2183 { 2184 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 2185 2186 return NXT_UNIT_ERROR; 2187 } 2188 2189 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 2190 resp->fields_count, 2191 (int) name_length, name, 2192 (int) value_length, value); 2193 2194 f = resp->fields + resp->fields_count; 2195 2196 nxt_unit_sptr_set(&f->name, buf->free); 2197 buf->free = nxt_cpymem(buf->free, name, name_length); 2198 *buf->free++ = '\0'; 2199 2200 nxt_unit_sptr_set(&f->value, buf->free); 2201 buf->free = nxt_cpymem(buf->free, value, value_length); 2202 *buf->free++ = '\0'; 2203 2204 f->hash = nxt_unit_field_hash(name, name_length); 2205 f->skip = 0; 2206 f->name_length = name_length; 2207 f->value_length = value_length; 2208 2209 resp->fields_count++; 2210 2211 return NXT_UNIT_OK; 2212 } 2213 2214 2215 int 2216 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 2217 const void* src, uint32_t size) 2218 { 2219 nxt_unit_buf_t *buf; 2220 nxt_unit_response_t *resp; 2221 nxt_unit_request_info_impl_t *req_impl; 2222 2223 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2224 2225 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2226 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 2227 2228 return NXT_UNIT_ERROR; 2229 } 2230 2231 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2232 nxt_unit_req_warn(req, "add_content: response already sent"); 2233 2234 return NXT_UNIT_ERROR; 2235 } 2236 2237 buf = req->response_buf; 2238 2239 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 2240 nxt_unit_req_warn(req, "add_content: buffer overflow"); 2241 2242 return NXT_UNIT_ERROR; 2243 } 2244 2245 resp = req->response; 2246 2247 if (resp->piggyback_content_length == 0) { 2248 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 2249 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 2250 } 2251 2252 resp->piggyback_content_length += size; 2253 2254 buf->free = nxt_cpymem(buf->free, src, size); 2255 2256 return NXT_UNIT_OK; 2257 } 2258 2259 2260 int 2261 nxt_unit_response_send(nxt_unit_request_info_t *req) 2262 { 2263 int rc; 2264 nxt_unit_mmap_buf_t *mmap_buf; 2265 nxt_unit_request_info_impl_t *req_impl; 2266 2267 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2268 2269 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2270 nxt_unit_req_warn(req, "send: response is not initialized yet"); 2271 2272 return NXT_UNIT_ERROR; 2273 } 2274 2275 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2276 nxt_unit_req_warn(req, "send: response already sent"); 2277 2278 return NXT_UNIT_ERROR; 2279 } 2280 2281 if (req->request->websocket_handshake && req->response->status == 101) { 2282 nxt_unit_response_upgrade(req); 2283 } 2284 2285 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 2286 req->response->fields_count, 2287 (int) (req->response_buf->free 2288 - req->response_buf->start)); 2289 2290 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 2291 2292 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2293 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 2294 req->response = NULL; 2295 req->response_buf = NULL; 2296 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2297 2298 nxt_unit_mmap_buf_free(mmap_buf); 2299 } 2300 2301 return rc; 2302 } 2303 2304 2305 int 2306 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 2307 { 2308 nxt_unit_request_info_impl_t *req_impl; 2309 2310 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2311 2312 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 2313 } 2314 2315 2316 nxt_unit_buf_t * 2317 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 2318 { 2319 int rc; 2320 nxt_unit_mmap_buf_t *mmap_buf; 2321 nxt_unit_request_info_impl_t *req_impl; 2322 2323 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 2324 nxt_unit_req_warn(req, "response_buf_alloc: " 2325 "requested buffer (%"PRIu32") too big", size); 2326 2327 return NULL; 2328 } 2329 2330 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 2331 2332 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2333 2334 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 2335 if (nxt_slow_path(mmap_buf == NULL)) { 2336 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 2337 2338 return NULL; 2339 } 2340 2341 mmap_buf->req = req; 2342 2343 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 2344 2345 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2346 size, size, mmap_buf, 2347 NULL); 2348 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2349 nxt_unit_mmap_buf_release(mmap_buf); 2350 2351 return NULL; 2352 } 2353 2354 return &mmap_buf->buf; 2355 } 2356 2357 2358 static nxt_unit_mmap_buf_t * 2359 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 2360 { 2361 nxt_unit_mmap_buf_t *mmap_buf; 2362 nxt_unit_ctx_impl_t *ctx_impl; 2363 2364 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2365 2366 pthread_mutex_lock(&ctx_impl->mutex); 2367 2368 if (ctx_impl->free_buf == NULL) { 2369 pthread_mutex_unlock(&ctx_impl->mutex); 2370 2371 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t)); 2372 if (nxt_slow_path(mmap_buf == NULL)) { 2373 return NULL; 2374 } 2375 2376 } else { 2377 mmap_buf = ctx_impl->free_buf; 2378 2379 nxt_unit_mmap_buf_unlink(mmap_buf); 2380 2381 pthread_mutex_unlock(&ctx_impl->mutex); 2382 } 2383 2384 mmap_buf->ctx_impl = ctx_impl; 2385 2386 mmap_buf->hdr = NULL; 2387 mmap_buf->free_ptr = NULL; 2388 2389 return mmap_buf; 2390 } 2391 2392 2393 static void 2394 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 2395 { 2396 nxt_unit_mmap_buf_unlink(mmap_buf); 2397 2398 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 2399 2400 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 2401 2402 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 2403 } 2404 2405 2406 int 2407 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 2408 { 2409 return req->request->websocket_handshake; 2410 } 2411 2412 2413 int 2414 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 2415 { 2416 int rc; 2417 nxt_unit_request_info_impl_t *req_impl; 2418 2419 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2420 2421 if (nxt_slow_path(req_impl->websocket != 0)) { 2422 nxt_unit_req_debug(req, "upgrade: already upgraded"); 2423 2424 return NXT_UNIT_OK; 2425 } 2426 2427 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2428 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 2429 2430 return NXT_UNIT_ERROR; 2431 } 2432 2433 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2434 nxt_unit_req_warn(req, "upgrade: response already sent"); 2435 2436 return NXT_UNIT_ERROR; 2437 } 2438 2439 rc = nxt_unit_request_hash_add(req->ctx, req); 2440 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2441 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 2442 2443 return NXT_UNIT_ERROR; 2444 } 2445 2446 req_impl->websocket = 1; 2447 2448 req->response->status = 101; 2449 2450 return NXT_UNIT_OK; 2451 } 2452 2453 2454 int 2455 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 2456 { 2457 nxt_unit_request_info_impl_t *req_impl; 2458 2459 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2460 2461 return req_impl->websocket; 2462 } 2463 2464 2465 nxt_unit_request_info_t * 2466 nxt_unit_get_request_info_from_data(void *data) 2467 { 2468 nxt_unit_request_info_impl_t *req_impl; 2469 2470 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 2471 2472 return &req_impl->req; 2473 } 2474 2475 2476 int 2477 nxt_unit_buf_send(nxt_unit_buf_t *buf) 2478 { 2479 int rc; 2480 nxt_unit_mmap_buf_t *mmap_buf; 2481 nxt_unit_request_info_t *req; 2482 nxt_unit_request_info_impl_t *req_impl; 2483 2484 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2485 2486 req = mmap_buf->req; 2487 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2488 2489 nxt_unit_req_debug(req, "buf_send: %d bytes", 2490 (int) (buf->free - buf->start)); 2491 2492 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2493 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 2494 2495 return NXT_UNIT_ERROR; 2496 } 2497 2498 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2499 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 2500 2501 return NXT_UNIT_ERROR; 2502 } 2503 2504 if (nxt_fast_path(buf->free > buf->start)) { 2505 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2506 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2507 return rc; 2508 } 2509 } 2510 2511 nxt_unit_mmap_buf_free(mmap_buf); 2512 2513 return NXT_UNIT_OK; 2514 } 2515 2516 2517 static void 2518 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 2519 { 2520 int rc; 2521 nxt_unit_mmap_buf_t *mmap_buf; 2522 nxt_unit_request_info_t *req; 2523 2524 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2525 2526 req = mmap_buf->req; 2527 2528 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1); 2529 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 2530 nxt_unit_mmap_buf_free(mmap_buf); 2531 2532 nxt_unit_request_info_release(req); 2533 2534 } else { 2535 nxt_unit_request_done(req, rc); 2536 } 2537 } 2538 2539 2540 static int 2541 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 2542 nxt_unit_mmap_buf_t *mmap_buf, int last) 2543 { 2544 struct { 2545 nxt_port_msg_t msg; 2546 nxt_port_mmap_msg_t mmap_msg; 2547 } m; 2548 2549 int rc; 2550 u_char *last_used, *first_free; 2551 ssize_t res; 2552 nxt_chunk_id_t first_free_chunk; 2553 nxt_unit_buf_t *buf; 2554 nxt_unit_impl_t *lib; 2555 nxt_port_mmap_header_t *hdr; 2556 nxt_unit_request_info_impl_t *req_impl; 2557 2558 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 2559 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2560 2561 buf = &mmap_buf->buf; 2562 hdr = mmap_buf->hdr; 2563 2564 m.mmap_msg.size = buf->free - buf->start; 2565 2566 m.msg.stream = req_impl->stream; 2567 m.msg.pid = lib->pid; 2568 m.msg.reply_port = 0; 2569 m.msg.type = _NXT_PORT_MSG_DATA; 2570 m.msg.last = last != 0; 2571 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 2572 m.msg.nf = 0; 2573 m.msg.mf = 0; 2574 m.msg.tracking = 0; 2575 2576 rc = NXT_UNIT_ERROR; 2577 2578 if (m.msg.mmap) { 2579 m.mmap_msg.mmap_id = hdr->id; 2580 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, 2581 (u_char *) buf->start); 2582 2583 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 2584 req_impl->stream, 2585 (int) m.mmap_msg.mmap_id, 2586 (int) m.mmap_msg.chunk_id, 2587 (int) m.mmap_msg.size); 2588 2589 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), 2590 NULL, 0); 2591 if (nxt_slow_path(res != sizeof(m))) { 2592 goto free_buf; 2593 } 2594 2595 last_used = (u_char *) buf->free - 1; 2596 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 2597 2598 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { 2599 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 2600 2601 buf->start = (char *) first_free; 2602 buf->free = buf->start; 2603 2604 if (buf->end < buf->start) { 2605 buf->end = buf->start; 2606 } 2607 2608 } else { 2609 buf->start = NULL; 2610 buf->free = NULL; 2611 buf->end = NULL; 2612 2613 mmap_buf->hdr = NULL; 2614 } 2615 2616 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, 2617 (int) m.mmap_msg.chunk_id - (int) first_free_chunk); 2618 2619 nxt_unit_debug(req->ctx, "allocated_chunks %d", 2620 (int) lib->outgoing.allocated_chunks); 2621 2622 } else { 2623 if (nxt_slow_path(mmap_buf->plain_ptr == NULL 2624 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) 2625 { 2626 nxt_unit_alert(req->ctx, 2627 "#%"PRIu32": failed to send plain memory buffer" 2628 ": no space reserved for message header", 2629 req_impl->stream); 2630 2631 goto free_buf; 2632 } 2633 2634 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); 2635 2636 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", 2637 req_impl->stream, 2638 (int) (sizeof(m.msg) + m.mmap_msg.size)); 2639 2640 res = nxt_unit_port_send(req->ctx, req->response_port, 2641 buf->start - sizeof(m.msg), 2642 m.mmap_msg.size + sizeof(m.msg), 2643 NULL, 0); 2644 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { 2645 goto free_buf; 2646 } 2647 } 2648 2649 rc = NXT_UNIT_OK; 2650 2651 free_buf: 2652 2653 nxt_unit_free_outgoing_buf(mmap_buf); 2654 2655 return rc; 2656 } 2657 2658 2659 void 2660 nxt_unit_buf_free(nxt_unit_buf_t *buf) 2661 { 2662 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 2663 } 2664 2665 2666 static void 2667 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 2668 { 2669 nxt_unit_free_outgoing_buf(mmap_buf); 2670 2671 nxt_unit_mmap_buf_release(mmap_buf); 2672 } 2673 2674 2675 static void 2676 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) 2677 { 2678 if (mmap_buf->hdr != NULL) { 2679 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, 2680 mmap_buf->hdr, mmap_buf->buf.start, 2681 mmap_buf->buf.end - mmap_buf->buf.start); 2682 2683 mmap_buf->hdr = NULL; 2684 2685 return; 2686 } 2687 2688 if (mmap_buf->free_ptr != NULL) { 2689 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr); 2690 2691 mmap_buf->free_ptr = NULL; 2692 } 2693 } 2694 2695 2696 static nxt_unit_read_buf_t * 2697 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2698 { 2699 nxt_unit_ctx_impl_t *ctx_impl; 2700 nxt_unit_read_buf_t *rbuf; 2701 2702 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2703 2704 pthread_mutex_lock(&ctx_impl->mutex); 2705 2706 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); 2707 2708 pthread_mutex_unlock(&ctx_impl->mutex); 2709 2710 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 2711 2712 return rbuf; 2713 } 2714 2715 2716 static nxt_unit_read_buf_t * 2717 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2718 { 2719 nxt_queue_link_t *link; 2720 nxt_unit_read_buf_t *rbuf; 2721 2722 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { 2723 link = nxt_queue_first(&ctx_impl->free_rbuf); 2724 nxt_queue_remove(link); 2725 2726 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); 2727 2728 return rbuf; 2729 } 2730 2731 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t)); 2732 2733 if (nxt_fast_path(rbuf != NULL)) { 2734 rbuf->ctx_impl = ctx_impl; 2735 } 2736 2737 return rbuf; 2738 } 2739 2740 2741 static void 2742 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2743 nxt_unit_read_buf_t *rbuf) 2744 { 2745 nxt_unit_ctx_impl_t *ctx_impl; 2746 2747 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2748 2749 pthread_mutex_lock(&ctx_impl->mutex); 2750 2751 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); 2752 2753 pthread_mutex_unlock(&ctx_impl->mutex); 2754 } 2755 2756 2757 nxt_unit_buf_t * 2758 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2759 { 2760 nxt_unit_mmap_buf_t *mmap_buf; 2761 2762 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2763 2764 if (mmap_buf->next == NULL) { 2765 return NULL; 2766 } 2767 2768 return &mmap_buf->next->buf; 2769 } 2770 2771 2772 uint32_t 2773 nxt_unit_buf_max(void) 2774 { 2775 return PORT_MMAP_DATA_SIZE; 2776 } 2777 2778 2779 uint32_t 2780 nxt_unit_buf_min(void) 2781 { 2782 return PORT_MMAP_CHUNK_SIZE; 2783 } 2784 2785 2786 int 2787 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2788 size_t size) 2789 { 2790 ssize_t res; 2791 2792 res = nxt_unit_response_write_nb(req, start, size, size); 2793 2794 return res < 0 ? -res : NXT_UNIT_OK; 2795 } 2796 2797 2798 ssize_t 2799 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, 2800 size_t size, size_t min_size) 2801 { 2802 int rc; 2803 ssize_t sent; 2804 uint32_t part_size, min_part_size, buf_size; 2805 const char *part_start; 2806 nxt_unit_mmap_buf_t mmap_buf; 2807 nxt_unit_request_info_impl_t *req_impl; 2808 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2809 2810 nxt_unit_req_debug(req, "write: %d", (int) size); 2811 2812 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2813 2814 part_start = start; 2815 sent = 0; 2816 2817 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2818 nxt_unit_req_alert(req, "write: response not initialized yet"); 2819 2820 return -NXT_UNIT_ERROR; 2821 } 2822 2823 /* Check if response is not send yet. */ 2824 if (nxt_slow_path(req->response_buf != NULL)) { 2825 part_size = req->response_buf->end - req->response_buf->free; 2826 part_size = nxt_min(size, part_size); 2827 2828 rc = nxt_unit_response_add_content(req, part_start, part_size); 2829 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2830 return -rc; 2831 } 2832 2833 rc = nxt_unit_response_send(req); 2834 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2835 return -rc; 2836 } 2837 2838 size -= part_size; 2839 part_start += part_size; 2840 sent += part_size; 2841 2842 min_size -= nxt_min(min_size, part_size); 2843 } 2844 2845 while (size > 0) { 2846 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2847 min_part_size = nxt_min(min_size, part_size); 2848 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); 2849 2850 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, 2851 min_part_size, &mmap_buf, local_buf); 2852 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2853 return -rc; 2854 } 2855 2856 buf_size = mmap_buf.buf.end - mmap_buf.buf.free; 2857 if (nxt_slow_path(buf_size == 0)) { 2858 return sent; 2859 } 2860 part_size = nxt_min(buf_size, part_size); 2861 2862 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2863 part_start, part_size); 2864 2865 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2866 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2867 return -rc; 2868 } 2869 2870 size -= part_size; 2871 part_start += part_size; 2872 sent += part_size; 2873 2874 min_size -= nxt_min(min_size, part_size); 2875 } 2876 2877 return sent; 2878 } 2879 2880 2881 int 2882 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2883 nxt_unit_read_info_t *read_info) 2884 { 2885 int rc; 2886 ssize_t n; 2887 uint32_t buf_size; 2888 nxt_unit_buf_t *buf; 2889 nxt_unit_mmap_buf_t mmap_buf; 2890 nxt_unit_request_info_impl_t *req_impl; 2891 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2892 2893 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2894 2895 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2896 nxt_unit_req_alert(req, "write: response not initialized yet"); 2897 2898 return NXT_UNIT_ERROR; 2899 } 2900 2901 /* Check if response is not send yet. */ 2902 if (nxt_slow_path(req->response_buf != NULL)) { 2903 2904 /* Enable content in headers buf. */ 2905 rc = nxt_unit_response_add_content(req, "", 0); 2906 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2907 nxt_unit_req_error(req, "Failed to add piggyback content"); 2908 2909 return rc; 2910 } 2911 2912 buf = req->response_buf; 2913 2914 while (buf->end - buf->free > 0) { 2915 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2916 if (nxt_slow_path(n < 0)) { 2917 nxt_unit_req_error(req, "Read error"); 2918 2919 return NXT_UNIT_ERROR; 2920 } 2921 2922 /* Manually increase sizes. */ 2923 buf->free += n; 2924 req->response->piggyback_content_length += n; 2925 2926 if (read_info->eof) { 2927 break; 2928 } 2929 } 2930 2931 rc = nxt_unit_response_send(req); 2932 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2933 nxt_unit_req_error(req, "Failed to send headers with content"); 2934 2935 return rc; 2936 } 2937 2938 if (read_info->eof) { 2939 return NXT_UNIT_OK; 2940 } 2941 } 2942 2943 while (!read_info->eof) { 2944 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 2945 read_info->buf_size); 2946 2947 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); 2948 2949 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2950 buf_size, buf_size, 2951 &mmap_buf, local_buf); 2952 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2953 return rc; 2954 } 2955 2956 buf = &mmap_buf.buf; 2957 2958 while (!read_info->eof && buf->end > buf->free) { 2959 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2960 if (nxt_slow_path(n < 0)) { 2961 nxt_unit_req_error(req, "Read error"); 2962 2963 nxt_unit_free_outgoing_buf(&mmap_buf); 2964 2965 return NXT_UNIT_ERROR; 2966 } 2967 2968 buf->free += n; 2969 } 2970 2971 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2972 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2973 nxt_unit_req_error(req, "Failed to send content"); 2974 2975 return rc; 2976 } 2977 } 2978 2979 return NXT_UNIT_OK; 2980 } 2981 2982 2983 ssize_t 2984 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 2985 { 2986 ssize_t buf_res, res; 2987 2988 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, 2989 dst, size); 2990 2991 nxt_unit_req_debug(req, "read: %d", (int) buf_res); 2992 2993 if (buf_res < (ssize_t) size && req->content_fd != -1) { 2994 res = read(req->content_fd, dst, size); 2995 if (nxt_slow_path(res < 0)) { 2996 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 2997 strerror(errno), errno); 2998 2999 return res; 3000 } 3001 3002 if (res < (ssize_t) size) { 3003 nxt_unit_close(req->content_fd); 3004 3005 req->content_fd = -1; 3006 } 3007 3008 req->content_length -= res; 3009 size -= res; 3010 3011 dst = nxt_pointer_to(dst, res); 3012 3013 } else { 3014 res = 0; 3015 } 3016 3017 return buf_res + res; 3018 } 3019 3020 3021 ssize_t 3022 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size) 3023 { 3024 char *p; 3025 size_t l_size, b_size; 3026 nxt_unit_buf_t *b; 3027 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf; 3028 3029 if (req->content_length == 0) { 3030 return 0; 3031 } 3032 3033 l_size = 0; 3034 3035 b = req->content_buf; 3036 3037 while (b != NULL) { 3038 b_size = b->end - b->free; 3039 p = memchr(b->free, '\n', b_size); 3040 3041 if (p != NULL) { 3042 p++; 3043 l_size += p - b->free; 3044 break; 3045 } 3046 3047 l_size += b_size; 3048 3049 if (max_size <= l_size) { 3050 break; 3051 } 3052 3053 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf); 3054 if (mmap_buf->next == NULL 3055 && req->content_fd != -1 3056 && l_size < req->content_length) 3057 { 3058 preread_buf = nxt_unit_request_preread(req, 16384); 3059 if (nxt_slow_path(preread_buf == NULL)) { 3060 return -1; 3061 } 3062 3063 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf); 3064 } 3065 3066 b = nxt_unit_buf_next(b); 3067 } 3068 3069 return nxt_min(max_size, l_size); 3070 } 3071 3072 3073 static nxt_unit_mmap_buf_t * 3074 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) 3075 { 3076 ssize_t res; 3077 nxt_unit_mmap_buf_t *mmap_buf; 3078 3079 if (req->content_fd == -1) { 3080 nxt_unit_req_alert(req, "preread: content_fd == -1"); 3081 return NULL; 3082 } 3083 3084 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 3085 if (nxt_slow_path(mmap_buf == NULL)) { 3086 nxt_unit_req_alert(req, "preread: failed to allocate buf"); 3087 return NULL; 3088 } 3089 3090 mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size); 3091 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3092 nxt_unit_req_alert(req, "preread: failed to allocate buf memory"); 3093 nxt_unit_mmap_buf_release(mmap_buf); 3094 return NULL; 3095 } 3096 3097 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3098 3099 mmap_buf->hdr = NULL; 3100 mmap_buf->buf.start = mmap_buf->free_ptr; 3101 mmap_buf->buf.free = mmap_buf->buf.start; 3102 mmap_buf->buf.end = mmap_buf->buf.start + size; 3103 3104 res = read(req->content_fd, mmap_buf->free_ptr, size); 3105 if (res < 0) { 3106 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 3107 strerror(errno), errno); 3108 3109 nxt_unit_mmap_buf_free(mmap_buf); 3110 3111 return NULL; 3112 } 3113 3114 if (res < (ssize_t) size) { 3115 nxt_unit_close(req->content_fd); 3116 3117 req->content_fd = -1; 3118 } 3119 3120 nxt_unit_req_debug(req, "preread: read %d", (int) res); 3121 3122 mmap_buf->buf.end = mmap_buf->buf.free + res; 3123 3124 return mmap_buf; 3125 } 3126 3127 3128 static ssize_t 3129 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) 3130 { 3131 u_char *p; 3132 size_t rest, copy, read; 3133 nxt_unit_buf_t *buf, *last_buf; 3134 3135 p = dst; 3136 rest = size; 3137 3138 buf = *b; 3139 last_buf = buf; 3140 3141 while (buf != NULL) { 3142 last_buf = buf; 3143 3144 copy = buf->end - buf->free; 3145 copy = nxt_min(rest, copy); 3146 3147 p = nxt_cpymem(p, buf->free, copy); 3148 3149 buf->free += copy; 3150 rest -= copy; 3151 3152 if (rest == 0) { 3153 if (buf->end == buf->free) { 3154 buf = nxt_unit_buf_next(buf); 3155 } 3156 3157 break; 3158 } 3159 3160 buf = nxt_unit_buf_next(buf); 3161 } 3162 3163 *b = last_buf; 3164 3165 read = size - rest; 3166 3167 *len -= read; 3168 3169 return read; 3170 } 3171 3172 3173 void 3174 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 3175 { 3176 uint32_t size; 3177 nxt_port_msg_t msg; 3178 nxt_unit_impl_t *lib; 3179 nxt_unit_request_info_impl_t *req_impl; 3180 3181 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 3182 3183 nxt_unit_req_debug(req, "done: %d", rc); 3184 3185 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3186 goto skip_response_send; 3187 } 3188 3189 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 3190 3191 size = nxt_length("Content-Type") + nxt_length("text/plain"); 3192 3193 rc = nxt_unit_response_init(req, 200, 1, size); 3194 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3195 goto skip_response_send; 3196 } 3197 3198 rc = nxt_unit_response_add_field(req, "Content-Type", 3199 nxt_length("Content-Type"), 3200 "text/plain", nxt_length("text/plain")); 3201 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3202 goto skip_response_send; 3203 } 3204 } 3205 3206 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 3207 3208 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 3209 3210 nxt_unit_buf_send_done(req->response_buf); 3211 3212 return; 3213 } 3214 3215 skip_response_send: 3216 3217 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 3218 3219 msg.stream = req_impl->stream; 3220 msg.pid = lib->pid; 3221 msg.reply_port = 0; 3222 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 3223 : _NXT_PORT_MSG_RPC_ERROR; 3224 msg.last = 1; 3225 msg.mmap = 0; 3226 msg.nf = 0; 3227 msg.mf = 0; 3228 msg.tracking = 0; 3229 3230 (void) nxt_unit_port_send(req->ctx, req->response_port, 3231 &msg, sizeof(msg), NULL, 0); 3232 3233 nxt_unit_request_info_release(req); 3234 } 3235 3236 3237 int 3238 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 3239 uint8_t last, const void *start, size_t size) 3240 { 3241 const struct iovec iov = { (void *) start, size }; 3242 3243 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); 3244 } 3245 3246 3247 int 3248 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 3249 uint8_t last, const struct iovec *iov, int iovcnt) 3250 { 3251 int i, rc; 3252 size_t l, copy; 3253 uint32_t payload_len, buf_size, alloc_size; 3254 const uint8_t *b; 3255 nxt_unit_buf_t *buf; 3256 nxt_unit_mmap_buf_t mmap_buf; 3257 nxt_websocket_header_t *wh; 3258 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 3259 3260 payload_len = 0; 3261 3262 for (i = 0; i < iovcnt; i++) { 3263 payload_len += iov[i].iov_len; 3264 } 3265 3266 buf_size = 10 + payload_len; 3267 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 3268 3269 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3270 alloc_size, alloc_size, 3271 &mmap_buf, local_buf); 3272 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3273 return rc; 3274 } 3275 3276 buf = &mmap_buf.buf; 3277 3278 buf->start[0] = 0; 3279 buf->start[1] = 0; 3280 3281 buf_size -= buf->end - buf->start; 3282 3283 wh = (void *) buf->free; 3284 3285 buf->free = nxt_websocket_frame_init(wh, payload_len); 3286 wh->fin = last; 3287 wh->opcode = opcode; 3288 3289 for (i = 0; i < iovcnt; i++) { 3290 b = iov[i].iov_base; 3291 l = iov[i].iov_len; 3292 3293 while (l > 0) { 3294 copy = buf->end - buf->free; 3295 copy = nxt_min(l, copy); 3296 3297 buf->free = nxt_cpymem(buf->free, b, copy); 3298 b += copy; 3299 l -= copy; 3300 3301 if (l > 0) { 3302 if (nxt_fast_path(buf->free > buf->start)) { 3303 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3304 3305 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3306 return rc; 3307 } 3308 } 3309 3310 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 3311 3312 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3313 alloc_size, alloc_size, 3314 &mmap_buf, local_buf); 3315 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3316 return rc; 3317 } 3318 3319 buf_size -= buf->end - buf->start; 3320 } 3321 } 3322 } 3323 3324 if (buf->free > buf->start) { 3325 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3326 } 3327 3328 return rc; 3329 } 3330 3331 3332 ssize_t 3333 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 3334 size_t size) 3335 { 3336 ssize_t res; 3337 uint8_t *b; 3338 uint64_t i, d; 3339 3340 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, 3341 dst, size); 3342 3343 if (ws->mask == NULL) { 3344 return res; 3345 } 3346 3347 b = dst; 3348 d = (ws->payload_len - ws->content_length - res) % 4; 3349 3350 for (i = 0; i < (uint64_t) res; i++) { 3351 b[i] ^= ws->mask[ (i + d) % 4 ]; 3352 } 3353 3354 return res; 3355 } 3356 3357 3358 int 3359 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) 3360 { 3361 char *b; 3362 size_t size, hsize; 3363 nxt_unit_websocket_frame_impl_t *ws_impl; 3364 3365 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 3366 3367 if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) { 3368 return NXT_UNIT_OK; 3369 } 3370 3371 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; 3372 3373 b = nxt_unit_malloc(ws->req->ctx, size); 3374 if (nxt_slow_path(b == NULL)) { 3375 return NXT_UNIT_ERROR; 3376 } 3377 3378 memcpy(b, ws_impl->buf->buf.start, size); 3379 3380 hsize = nxt_websocket_frame_header_size(b); 3381 3382 ws_impl->buf->buf.start = b; 3383 ws_impl->buf->buf.free = b + hsize; 3384 ws_impl->buf->buf.end = b + size; 3385 3386 ws_impl->buf->free_ptr = b; 3387 3388 ws_impl->ws.header = (nxt_websocket_header_t *) b; 3389 3390 if (ws_impl->ws.header->mask) { 3391 ws_impl->ws.mask = (uint8_t *) b + hsize - 4; 3392 3393 } else { 3394 ws_impl->ws.mask = NULL; 3395 } 3396 3397 return NXT_UNIT_OK; 3398 } 3399 3400 3401 void 3402 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) 3403 { 3404 nxt_unit_websocket_frame_release(ws); 3405 } 3406 3407 3408 static nxt_port_mmap_header_t * 3409 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 3410 nxt_chunk_id_t *c, int *n, int min_n) 3411 { 3412 int res, nchunks, i; 3413 uint32_t outgoing_size; 3414 nxt_unit_mmap_t *mm, *mm_end; 3415 nxt_unit_impl_t *lib; 3416 nxt_port_mmap_header_t *hdr; 3417 3418 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3419 3420 pthread_mutex_lock(&lib->outgoing.mutex); 3421 3422 retry: 3423 3424 outgoing_size = lib->outgoing.size; 3425 3426 mm_end = lib->outgoing.elts + outgoing_size; 3427 3428 for (mm = lib->outgoing.elts; mm < mm_end; mm++) { 3429 hdr = mm->hdr; 3430 3431 if (hdr->sent_over != 0xFFFFu 3432 && (hdr->sent_over != port->id.id 3433 || mm->src_thread != pthread_self())) 3434 { 3435 continue; 3436 } 3437 3438 *c = 0; 3439 3440 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { 3441 nchunks = 1; 3442 3443 while (nchunks < *n) { 3444 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, 3445 *c + nchunks); 3446 3447 if (res == 0) { 3448 if (nchunks >= min_n) { 3449 *n = nchunks; 3450 3451 goto unlock; 3452 } 3453 3454 for (i = 0; i < nchunks; i++) { 3455 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); 3456 } 3457 3458 *c += nchunks + 1; 3459 nchunks = 0; 3460 break; 3461 } 3462 3463 nchunks++; 3464 } 3465 3466 if (nchunks >= min_n) { 3467 *n = nchunks; 3468 3469 goto unlock; 3470 } 3471 } 3472 3473 hdr->oosm = 1; 3474 } 3475 3476 if (outgoing_size >= lib->shm_mmap_limit) { 3477 /* Cannot allocate more shared memory. */ 3478 pthread_mutex_unlock(&lib->outgoing.mutex); 3479 3480 if (min_n == 0) { 3481 *n = 0; 3482 } 3483 3484 if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n 3485 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) 3486 { 3487 /* Memory allocated by application, but not send to router. */ 3488 return NULL; 3489 } 3490 3491 /* Notify router about OOSM condition. */ 3492 3493 res = nxt_unit_send_oosm(ctx, port); 3494 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3495 return NULL; 3496 } 3497 3498 /* Return if caller can handle OOSM condition. Non-blocking mode. */ 3499 3500 if (min_n == 0) { 3501 return NULL; 3502 } 3503 3504 nxt_unit_debug(ctx, "oosm: waiting for ACK"); 3505 3506 res = nxt_unit_wait_shm_ack(ctx); 3507 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3508 return NULL; 3509 } 3510 3511 nxt_unit_debug(ctx, "oosm: retry"); 3512 3513 pthread_mutex_lock(&lib->outgoing.mutex); 3514 3515 goto retry; 3516 } 3517 3518 *c = 0; 3519 hdr = nxt_unit_new_mmap(ctx, port, *n); 3520 3521 unlock: 3522 3523 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n); 3524 3525 nxt_unit_debug(ctx, "allocated_chunks %d", 3526 (int) lib->outgoing.allocated_chunks); 3527 3528 pthread_mutex_unlock(&lib->outgoing.mutex); 3529 3530 return hdr; 3531 } 3532 3533 3534 static int 3535 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 3536 { 3537 ssize_t res; 3538 nxt_port_msg_t msg; 3539 nxt_unit_impl_t *lib; 3540 3541 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3542 3543 msg.stream = 0; 3544 msg.pid = lib->pid; 3545 msg.reply_port = 0; 3546 msg.type = _NXT_PORT_MSG_OOSM; 3547 msg.last = 0; 3548 msg.mmap = 0; 3549 msg.nf = 0; 3550 msg.mf = 0; 3551 msg.tracking = 0; 3552 3553 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 3554 if (nxt_slow_path(res != sizeof(msg))) { 3555 return NXT_UNIT_ERROR; 3556 } 3557 3558 return NXT_UNIT_OK; 3559 } 3560 3561 3562 static int 3563 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) 3564 { 3565 int res; 3566 nxt_unit_ctx_impl_t *ctx_impl; 3567 nxt_unit_read_buf_t *rbuf; 3568 3569 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3570 3571 while (1) { 3572 rbuf = nxt_unit_read_buf_get(ctx); 3573 if (nxt_slow_path(rbuf == NULL)) { 3574 return NXT_UNIT_ERROR; 3575 } 3576 3577 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 3578 if (res == NXT_UNIT_ERROR) { 3579 nxt_unit_read_buf_release(ctx, rbuf); 3580 3581 return NXT_UNIT_ERROR; 3582 } 3583 3584 if (nxt_unit_is_shm_ack(rbuf)) { 3585 nxt_unit_read_buf_release(ctx, rbuf); 3586 break; 3587 } 3588 3589 pthread_mutex_lock(&ctx_impl->mutex); 3590 3591 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link); 3592 3593 pthread_mutex_unlock(&ctx_impl->mutex); 3594 3595 if (nxt_unit_is_quit(rbuf)) { 3596 nxt_unit_debug(ctx, "oosm: quit received"); 3597 3598 return NXT_UNIT_ERROR; 3599 } 3600 } 3601 3602 return NXT_UNIT_OK; 3603 } 3604 3605 3606 static nxt_unit_mmap_t * 3607 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 3608 { 3609 uint32_t cap, n; 3610 nxt_unit_mmap_t *e; 3611 3612 if (nxt_fast_path(mmaps->size > i)) { 3613 return mmaps->elts + i; 3614 } 3615 3616 cap = mmaps->cap; 3617 3618 if (cap == 0) { 3619 cap = i + 1; 3620 } 3621 3622 while (i + 1 > cap) { 3623 3624 if (cap < 16) { 3625 cap = cap * 2; 3626 3627 } else { 3628 cap = cap + cap / 2; 3629 } 3630 } 3631 3632 if (cap != mmaps->cap) { 3633 3634 e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t)); 3635 if (nxt_slow_path(e == NULL)) { 3636 return NULL; 3637 } 3638 3639 mmaps->elts = e; 3640 3641 for (n = mmaps->cap; n < cap; n++) { 3642 e = mmaps->elts + n; 3643 3644 e->hdr = NULL; 3645 nxt_queue_init(&e->awaiting_rbuf); 3646 } 3647 3648 mmaps->cap = cap; 3649 } 3650 3651 if (i + 1 > mmaps->size) { 3652 mmaps->size = i + 1; 3653 } 3654 3655 return mmaps->elts + i; 3656 } 3657 3658 3659 static nxt_port_mmap_header_t * 3660 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) 3661 { 3662 int i, fd, rc; 3663 void *mem; 3664 nxt_unit_mmap_t *mm; 3665 nxt_unit_impl_t *lib; 3666 nxt_port_mmap_header_t *hdr; 3667 3668 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3669 3670 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size); 3671 if (nxt_slow_path(mm == NULL)) { 3672 nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); 3673 3674 return NULL; 3675 } 3676 3677 fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE); 3678 if (nxt_slow_path(fd == -1)) { 3679 goto remove_fail; 3680 } 3681 3682 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 3683 if (nxt_slow_path(mem == MAP_FAILED)) { 3684 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, 3685 strerror(errno), errno); 3686 3687 nxt_unit_close(fd); 3688 3689 goto remove_fail; 3690 } 3691 3692 mm->hdr = mem; 3693 hdr = mem; 3694 3695 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 3696 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 3697 3698 hdr->id = lib->outgoing.size - 1; 3699 hdr->src_pid = lib->pid; 3700 hdr->dst_pid = port->id.pid; 3701 hdr->sent_over = port->id.id; 3702 mm->src_thread = pthread_self(); 3703 3704 /* Mark first n chunk(s) as busy */ 3705 for (i = 0; i < n; i++) { 3706 nxt_port_mmap_set_chunk_busy(hdr->free_map, i); 3707 } 3708 3709 /* Mark as busy chunk followed the last available chunk. */ 3710 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 3711 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 3712 3713 pthread_mutex_unlock(&lib->outgoing.mutex); 3714 3715 rc = nxt_unit_send_mmap(ctx, port, fd); 3716 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3717 munmap(mem, PORT_MMAP_SIZE); 3718 hdr = NULL; 3719 3720 } else { 3721 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", 3722 hdr->id, (int) lib->pid, (int) port->id.pid); 3723 } 3724 3725 nxt_unit_close(fd); 3726 3727 pthread_mutex_lock(&lib->outgoing.mutex); 3728 3729 if (nxt_fast_path(hdr != NULL)) { 3730 return hdr; 3731 } 3732 3733 remove_fail: 3734 3735 lib->outgoing.size--; 3736 3737 return NULL; 3738 } 3739 3740 3741 static int 3742 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) 3743 { 3744 int fd; 3745 nxt_unit_impl_t *lib; 3746 3747 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3748 3749 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) 3750 char name[64]; 3751 3752 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", 3753 lib->pid, (void *) pthread_self()); 3754 #endif 3755 3756 #if (NXT_HAVE_MEMFD_CREATE) 3757 3758 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 3759 if (nxt_slow_path(fd == -1)) { 3760 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, 3761 strerror(errno), errno); 3762 3763 return -1; 3764 } 3765 3766 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); 3767 3768 #elif (NXT_HAVE_SHM_OPEN_ANON) 3769 3770 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 3771 if (nxt_slow_path(fd == -1)) { 3772 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", 3773 strerror(errno), errno); 3774 3775 return -1; 3776 } 3777 3778 #elif (NXT_HAVE_SHM_OPEN) 3779 3780 /* Just in case. */ 3781 shm_unlink(name); 3782 3783 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 3784 if (nxt_slow_path(fd == -1)) { 3785 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, 3786 strerror(errno), errno); 3787 3788 return -1; 3789 } 3790 3791 if (nxt_slow_path(shm_unlink(name) == -1)) { 3792 nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name, 3793 strerror(errno), errno); 3794 } 3795 3796 #else 3797 3798 #error No working shared memory implementation. 3799 3800 #endif 3801 3802 if (nxt_slow_path(ftruncate(fd, size) == -1)) { 3803 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, 3804 strerror(errno), errno); 3805 3806 nxt_unit_close(fd); 3807 3808 return -1; 3809 } 3810 3811 return fd; 3812 } 3813 3814 3815 static int 3816 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) 3817 { 3818 ssize_t res; 3819 nxt_port_msg_t msg; 3820 nxt_unit_impl_t *lib; 3821 union { 3822 struct cmsghdr cm; 3823 char space[CMSG_SPACE(sizeof(int))]; 3824 } cmsg; 3825 3826 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3827 3828 msg.stream = 0; 3829 msg.pid = lib->pid; 3830 msg.reply_port = 0; 3831 msg.type = _NXT_PORT_MSG_MMAP; 3832 msg.last = 0; 3833 msg.mmap = 0; 3834 msg.nf = 0; 3835 msg.mf = 0; 3836 msg.tracking = 0; 3837 3838 /* 3839 * Fill all padding fields with 0. 3840 * Code in Go 1.11 validate cmsghdr using padding field as part of len. 3841 * See Cmsghdr definition and socketControlMessageHeaderAndData function. 3842 */ 3843 memset(&cmsg, 0, sizeof(cmsg)); 3844 3845 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 3846 cmsg.cm.cmsg_level = SOL_SOCKET; 3847 cmsg.cm.cmsg_type = SCM_RIGHTS; 3848 3849 /* 3850 * memcpy() is used instead of simple 3851 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 3852 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 3853 * dereferencing type-punned pointer will break strict-aliasing rules 3854 * 3855 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3856 * in the same simple assignment as in the code above. 3857 */ 3858 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3859 3860 res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), 3861 &cmsg, sizeof(cmsg)); 3862 if (nxt_slow_path(res != sizeof(msg))) { 3863 return NXT_UNIT_ERROR; 3864 } 3865 3866 return NXT_UNIT_OK; 3867 } 3868 3869 3870 static int 3871 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 3872 uint32_t size, uint32_t min_size, 3873 nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) 3874 { 3875 int nchunks, min_nchunks; 3876 nxt_chunk_id_t c; 3877 nxt_port_mmap_header_t *hdr; 3878 3879 if (size <= NXT_UNIT_MAX_PLAIN_SIZE) { 3880 if (local_buf != NULL) { 3881 mmap_buf->free_ptr = NULL; 3882 mmap_buf->plain_ptr = local_buf; 3883 3884 } else { 3885 mmap_buf->free_ptr = nxt_unit_malloc(ctx, 3886 size + sizeof(nxt_port_msg_t)); 3887 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3888 return NXT_UNIT_ERROR; 3889 } 3890 3891 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3892 } 3893 3894 mmap_buf->hdr = NULL; 3895 mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); 3896 mmap_buf->buf.free = mmap_buf->buf.start; 3897 mmap_buf->buf.end = mmap_buf->buf.start + size; 3898 3899 nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", 3900 mmap_buf->buf.start, (int) size); 3901 3902 return NXT_UNIT_OK; 3903 } 3904 3905 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3906 min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3907 3908 hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks); 3909 if (nxt_slow_path(hdr == NULL)) { 3910 if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { 3911 mmap_buf->hdr = NULL; 3912 mmap_buf->buf.start = NULL; 3913 mmap_buf->buf.free = NULL; 3914 mmap_buf->buf.end = NULL; 3915 mmap_buf->free_ptr = NULL; 3916 3917 return NXT_UNIT_OK; 3918 } 3919 3920 return NXT_UNIT_ERROR; 3921 } 3922 3923 mmap_buf->hdr = hdr; 3924 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); 3925 mmap_buf->buf.free = mmap_buf->buf.start; 3926 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; 3927 mmap_buf->free_ptr = NULL; 3928 mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3929 3930 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", 3931 (int) hdr->id, (int) c, 3932 (int) (nchunks * PORT_MMAP_CHUNK_SIZE)); 3933 3934 return NXT_UNIT_OK; 3935 } 3936 3937 3938 static int 3939 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 3940 { 3941 int rc; 3942 void *mem; 3943 nxt_queue_t awaiting_rbuf; 3944 struct stat mmap_stat; 3945 nxt_unit_mmap_t *mm; 3946 nxt_unit_impl_t *lib; 3947 nxt_unit_ctx_impl_t *ctx_impl; 3948 nxt_unit_read_buf_t *rbuf; 3949 nxt_port_mmap_header_t *hdr; 3950 3951 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3952 3953 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 3954 3955 if (fstat(fd, &mmap_stat) == -1) { 3956 nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, 3957 strerror(errno), errno); 3958 3959 return NXT_UNIT_ERROR; 3960 } 3961 3962 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, 3963 MAP_SHARED, fd, 0); 3964 if (nxt_slow_path(mem == MAP_FAILED)) { 3965 nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)", 3966 strerror(errno), errno); 3967 3968 return NXT_UNIT_ERROR; 3969 } 3970 3971 hdr = mem; 3972 3973 if (nxt_slow_path(hdr->src_pid != pid)) { 3974 3975 nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header " 3976 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 3977 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 3978 3979 munmap(mem, PORT_MMAP_SIZE); 3980 3981 return NXT_UNIT_ERROR; 3982 } 3983 3984 nxt_queue_init(&awaiting_rbuf); 3985 3986 pthread_mutex_lock(&lib->incoming.mutex); 3987 3988 mm = nxt_unit_mmap_at(&lib->incoming, hdr->id); 3989 if (nxt_slow_path(mm == NULL)) { 3990 nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array"); 3991 3992 munmap(mem, PORT_MMAP_SIZE); 3993 3994 rc = NXT_UNIT_ERROR; 3995 3996 } else { 3997 mm->hdr = hdr; 3998 3999 hdr->sent_over = 0xFFFFu; 4000 4001 nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf); 4002 nxt_queue_init(&mm->awaiting_rbuf); 4003 4004 rc = NXT_UNIT_OK; 4005 } 4006 4007 pthread_mutex_unlock(&lib->incoming.mutex); 4008 4009 nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { 4010 4011 ctx_impl = rbuf->ctx_impl; 4012 4013 pthread_mutex_lock(&ctx_impl->mutex); 4014 4015 nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link); 4016 4017 pthread_mutex_unlock(&ctx_impl->mutex); 4018 4019 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); 4020 4021 nxt_unit_awake_ctx(ctx, ctx_impl); 4022 4023 } nxt_queue_loop; 4024 4025 return rc; 4026 } 4027 4028 4029 static void 4030 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl) 4031 { 4032 nxt_port_msg_t msg; 4033 4034 if (nxt_fast_path(ctx == &ctx_impl->ctx)) { 4035 return; 4036 } 4037 4038 if (nxt_slow_path(ctx_impl->read_port == NULL 4039 || ctx_impl->read_port->out_fd == -1)) 4040 { 4041 nxt_unit_alert(ctx, "target context read_port is NULL or not writable"); 4042 4043 return; 4044 } 4045 4046 memset(&msg, 0, sizeof(nxt_port_msg_t)); 4047 4048 msg.type = _NXT_PORT_MSG_RPC_READY; 4049 4050 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, 4051 &msg, sizeof(msg), NULL, 0); 4052 } 4053 4054 4055 static void 4056 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) 4057 { 4058 pthread_mutex_init(&mmaps->mutex, NULL); 4059 4060 mmaps->size = 0; 4061 mmaps->cap = 0; 4062 mmaps->elts = NULL; 4063 mmaps->allocated_chunks = 0; 4064 } 4065 4066 4067 nxt_inline void 4068 nxt_unit_process_use(nxt_unit_process_t *process) 4069 { 4070 nxt_atomic_fetch_add(&process->use_count, 1); 4071 } 4072 4073 4074 nxt_inline void 4075 nxt_unit_process_release(nxt_unit_process_t *process) 4076 { 4077 long c; 4078 4079 c = nxt_atomic_fetch_add(&process->use_count, -1); 4080 4081 if (c == 1) { 4082 nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); 4083 4084 nxt_unit_free(NULL, process); 4085 } 4086 } 4087 4088 4089 static void 4090 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) 4091 { 4092 nxt_unit_mmap_t *mm, *end; 4093 4094 if (mmaps->elts != NULL) { 4095 end = mmaps->elts + mmaps->size; 4096 4097 for (mm = mmaps->elts; mm < end; mm++) { 4098 munmap(mm->hdr, PORT_MMAP_SIZE); 4099 } 4100 4101 nxt_unit_free(NULL, mmaps->elts); 4102 } 4103 4104 pthread_mutex_destroy(&mmaps->mutex); 4105 } 4106 4107 4108 static int 4109 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, 4110 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, 4111 nxt_unit_read_buf_t *rbuf) 4112 { 4113 int res, need_rbuf; 4114 nxt_unit_mmap_t *mm; 4115 nxt_unit_ctx_impl_t *ctx_impl; 4116 4117 mm = nxt_unit_mmap_at(mmaps, id); 4118 if (nxt_slow_path(mm == NULL)) { 4119 nxt_unit_alert(ctx, "failed to allocate mmap"); 4120 4121 pthread_mutex_unlock(&mmaps->mutex); 4122 4123 *hdr = NULL; 4124 4125 return NXT_UNIT_ERROR; 4126 } 4127 4128 *hdr = mm->hdr; 4129 4130 if (nxt_fast_path(*hdr != NULL)) { 4131 return NXT_UNIT_OK; 4132 } 4133 4134 need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf); 4135 4136 nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link); 4137 4138 pthread_mutex_unlock(&mmaps->mutex); 4139 4140 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4141 4142 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 4143 4144 if (need_rbuf) { 4145 res = nxt_unit_get_mmap(ctx, pid, id); 4146 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 4147 return NXT_UNIT_ERROR; 4148 } 4149 } 4150 4151 return NXT_UNIT_AGAIN; 4152 } 4153 4154 4155 static int 4156 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 4157 nxt_unit_read_buf_t *rbuf) 4158 { 4159 int res; 4160 void *start; 4161 uint32_t size; 4162 nxt_unit_impl_t *lib; 4163 nxt_unit_mmaps_t *mmaps; 4164 nxt_unit_mmap_buf_t *b, **incoming_tail; 4165 nxt_port_mmap_msg_t *mmap_msg, *end; 4166 nxt_port_mmap_header_t *hdr; 4167 4168 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 4169 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 4170 recv_msg->stream, (int) recv_msg->size); 4171 4172 return NXT_UNIT_ERROR; 4173 } 4174 4175 mmap_msg = recv_msg->start; 4176 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 4177 4178 incoming_tail = &recv_msg->incoming_buf; 4179 4180 /* Allocating buffer structures. */ 4181 for (; mmap_msg < end; mmap_msg++) { 4182 b = nxt_unit_mmap_buf_get(ctx); 4183 if (nxt_slow_path(b == NULL)) { 4184 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 4185 recv_msg->stream); 4186 4187 while (recv_msg->incoming_buf != NULL) { 4188 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 4189 } 4190 4191 return NXT_UNIT_ERROR; 4192 } 4193 4194 nxt_unit_mmap_buf_insert(incoming_tail, b); 4195 incoming_tail = &b->next; 4196 } 4197 4198 b = recv_msg->incoming_buf; 4199 mmap_msg = recv_msg->start; 4200 4201 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4202 4203 mmaps = &lib->incoming; 4204 4205 pthread_mutex_lock(&mmaps->mutex); 4206 4207 for (; mmap_msg < end; mmap_msg++) { 4208 res = nxt_unit_check_rbuf_mmap(ctx, mmaps, 4209 recv_msg->pid, mmap_msg->mmap_id, 4210 &hdr, rbuf); 4211 4212 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4213 while (recv_msg->incoming_buf != NULL) { 4214 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 4215 } 4216 4217 return res; 4218 } 4219 4220 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 4221 size = mmap_msg->size; 4222 4223 if (recv_msg->start == mmap_msg) { 4224 recv_msg->start = start; 4225 recv_msg->size = size; 4226 } 4227 4228 b->buf.start = start; 4229 b->buf.free = start; 4230 b->buf.end = b->buf.start + size; 4231 b->hdr = hdr; 4232 4233 b = b->next; 4234 4235 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 4236 recv_msg->stream, 4237 start, (int) size, 4238 (int) hdr->src_pid, (int) hdr->dst_pid, 4239 (int) hdr->id, (int) mmap_msg->chunk_id, 4240 (int) mmap_msg->size); 4241 } 4242 4243 pthread_mutex_unlock(&mmaps->mutex); 4244 4245 return NXT_UNIT_OK; 4246 } 4247 4248 4249 static int 4250 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) 4251 { 4252 ssize_t res; 4253 nxt_unit_impl_t *lib; 4254 nxt_unit_ctx_impl_t *ctx_impl; 4255 4256 struct { 4257 nxt_port_msg_t msg; 4258 nxt_port_msg_get_mmap_t get_mmap; 4259 } m; 4260 4261 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4262 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4263 4264 memset(&m.msg, 0, sizeof(nxt_port_msg_t)); 4265 4266 m.msg.pid = lib->pid; 4267 m.msg.reply_port = ctx_impl->read_port->id.id; 4268 m.msg.type = _NXT_PORT_MSG_GET_MMAP; 4269 4270 m.get_mmap.id = id; 4271 4272 nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); 4273 4274 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); 4275 if (nxt_slow_path(res != sizeof(m))) { 4276 return NXT_UNIT_ERROR; 4277 } 4278 4279 return NXT_UNIT_OK; 4280 } 4281 4282 4283 static void 4284 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, 4285 void *start, uint32_t size) 4286 { 4287 int freed_chunks; 4288 u_char *p, *end; 4289 nxt_chunk_id_t c; 4290 nxt_unit_impl_t *lib; 4291 4292 memset(start, 0xA5, size); 4293 4294 p = start; 4295 end = p + size; 4296 c = nxt_port_mmap_chunk_id(hdr, p); 4297 freed_chunks = 0; 4298 4299 while (p < end) { 4300 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 4301 4302 p += PORT_MMAP_CHUNK_SIZE; 4303 c++; 4304 freed_chunks++; 4305 } 4306 4307 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4308 4309 if (hdr->src_pid == lib->pid && freed_chunks != 0) { 4310 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks); 4311 4312 nxt_unit_debug(ctx, "allocated_chunks %d", 4313 (int) lib->outgoing.allocated_chunks); 4314 } 4315 4316 if (hdr->dst_pid == lib->pid 4317 && freed_chunks != 0 4318 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) 4319 { 4320 nxt_unit_send_shm_ack(ctx, hdr->src_pid); 4321 } 4322 } 4323 4324 4325 static int 4326 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) 4327 { 4328 ssize_t res; 4329 nxt_port_msg_t msg; 4330 nxt_unit_impl_t *lib; 4331 4332 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4333 4334 msg.stream = 0; 4335 msg.pid = lib->pid; 4336 msg.reply_port = 0; 4337 msg.type = _NXT_PORT_MSG_SHM_ACK; 4338 msg.last = 0; 4339 msg.mmap = 0; 4340 msg.nf = 0; 4341 msg.mf = 0; 4342 msg.tracking = 0; 4343 4344 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 4345 if (nxt_slow_path(res != sizeof(msg))) { 4346 return NXT_UNIT_ERROR; 4347 } 4348 4349 return NXT_UNIT_OK; 4350 } 4351 4352 4353 static nxt_int_t 4354 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 4355 { 4356 nxt_process_t *process; 4357 4358 process = data; 4359 4360 if (lhq->key.length == sizeof(pid_t) 4361 && *(pid_t *) lhq->key.start == process->pid) 4362 { 4363 return NXT_OK; 4364 } 4365 4366 return NXT_DECLINED; 4367 } 4368 4369 4370 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 4371 NXT_LVLHSH_DEFAULT, 4372 nxt_unit_lvlhsh_pid_test, 4373 nxt_unit_lvlhsh_alloc, 4374 nxt_unit_lvlhsh_free, 4375 }; 4376 4377 4378 static inline void 4379 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) 4380 { 4381 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 4382 lhq->key.length = sizeof(*pid); 4383 lhq->key.start = (u_char *) pid; 4384 lhq->proto = &lvlhsh_processes_proto; 4385 } 4386 4387 4388 static nxt_unit_process_t * 4389 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) 4390 { 4391 nxt_unit_impl_t *lib; 4392 nxt_unit_process_t *process; 4393 nxt_lvlhsh_query_t lhq; 4394 4395 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4396 4397 nxt_unit_process_lhq_pid(&lhq, &pid); 4398 4399 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { 4400 process = lhq.value; 4401 nxt_unit_process_use(process); 4402 4403 return process; 4404 } 4405 4406 process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t)); 4407 if (nxt_slow_path(process == NULL)) { 4408 nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid); 4409 4410 return NULL; 4411 } 4412 4413 process->pid = pid; 4414 process->use_count = 2; 4415 process->next_port_id = 0; 4416 process->lib = lib; 4417 4418 nxt_queue_init(&process->ports); 4419 4420 lhq.replace = 0; 4421 lhq.value = process; 4422 4423 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { 4424 4425 case NXT_OK: 4426 break; 4427 4428 default: 4429 nxt_unit_alert(ctx, "process %d insert failed", (int) pid); 4430 4431 nxt_unit_free(ctx, process); 4432 process = NULL; 4433 break; 4434 } 4435 4436 return process; 4437 } 4438 4439 4440 static nxt_unit_process_t * 4441 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) 4442 { 4443 int rc; 4444 nxt_lvlhsh_query_t lhq; 4445 4446 nxt_unit_process_lhq_pid(&lhq, &pid); 4447 4448 if (remove) { 4449 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); 4450 4451 } else { 4452 rc = nxt_lvlhsh_find(&lib->processes, &lhq); 4453 } 4454 4455 if (rc == NXT_OK) { 4456 if (!remove) { 4457 nxt_unit_process_use(lhq.value); 4458 } 4459 4460 return lhq.value; 4461 } 4462 4463 return NULL; 4464 } 4465 4466 4467 static nxt_unit_process_t * 4468 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) 4469 { 4470 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); 4471 } 4472 4473 4474 int 4475 nxt_unit_run(nxt_unit_ctx_t *ctx) 4476 { 4477 int rc; 4478 nxt_unit_ctx_impl_t *ctx_impl; 4479 4480 nxt_unit_ctx_use(ctx); 4481 4482 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4483 4484 rc = NXT_UNIT_OK; 4485 4486 while (nxt_fast_path(ctx_impl->online)) { 4487 rc = nxt_unit_run_once_impl(ctx); 4488 4489 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4490 nxt_unit_quit(ctx); 4491 break; 4492 } 4493 } 4494 4495 nxt_unit_ctx_release(ctx); 4496 4497 return rc; 4498 } 4499 4500 4501 int 4502 nxt_unit_run_once(nxt_unit_ctx_t *ctx) 4503 { 4504 int rc; 4505 4506 nxt_unit_ctx_use(ctx); 4507 4508 rc = nxt_unit_run_once_impl(ctx); 4509 4510 nxt_unit_ctx_release(ctx); 4511 4512 return rc; 4513 } 4514 4515 4516 static int 4517 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) 4518 { 4519 int rc; 4520 nxt_unit_read_buf_t *rbuf; 4521 4522 rbuf = nxt_unit_read_buf_get(ctx); 4523 if (nxt_slow_path(rbuf == NULL)) { 4524 return NXT_UNIT_ERROR; 4525 } 4526 4527 rc = nxt_unit_read_buf(ctx, rbuf); 4528 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4529 nxt_unit_read_buf_release(ctx, rbuf); 4530 4531 return rc; 4532 } 4533 4534 rc = nxt_unit_process_msg(ctx, rbuf); 4535 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4536 return NXT_UNIT_ERROR; 4537 } 4538 4539 rc = nxt_unit_process_pending_rbuf(ctx); 4540 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4541 return NXT_UNIT_ERROR; 4542 } 4543 4544 nxt_unit_process_ready_req(ctx); 4545 4546 return rc; 4547 } 4548 4549 4550 static int 4551 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) 4552 { 4553 int nevents, res, err; 4554 nxt_unit_impl_t *lib; 4555 nxt_unit_ctx_impl_t *ctx_impl; 4556 nxt_unit_port_impl_t *port_impl; 4557 struct pollfd fds[2]; 4558 4559 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4560 4561 if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) { 4562 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4563 } 4564 4565 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, 4566 port); 4567 4568 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4569 4570 retry: 4571 4572 if (port_impl->from_socket == 0) { 4573 res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf); 4574 if (res == NXT_UNIT_OK) { 4575 if (nxt_unit_is_read_socket(rbuf)) { 4576 port_impl->from_socket++; 4577 4578 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", 4579 (int) ctx_impl->read_port->id.pid, 4580 (int) ctx_impl->read_port->id.id, 4581 port_impl->from_socket); 4582 4583 } else { 4584 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", 4585 (int) ctx_impl->read_port->id.pid, 4586 (int) ctx_impl->read_port->id.id, 4587 (int) rbuf->size); 4588 4589 return NXT_UNIT_OK; 4590 } 4591 } 4592 } 4593 4594 res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); 4595 if (res == NXT_UNIT_OK) { 4596 return NXT_UNIT_OK; 4597 } 4598 4599 fds[0].fd = ctx_impl->read_port->in_fd; 4600 fds[0].events = POLLIN; 4601 fds[0].revents = 0; 4602 4603 fds[1].fd = lib->shared_port->in_fd; 4604 fds[1].events = POLLIN; 4605 fds[1].revents = 0; 4606 4607 nevents = poll(fds, 2, -1); 4608 if (nxt_slow_path(nevents == -1)) { 4609 err = errno; 4610 4611 if (err == EINTR) { 4612 goto retry; 4613 } 4614 4615 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)", 4616 fds[0].fd, fds[1].fd, strerror(err), err); 4617 4618 rbuf->size = -1; 4619 4620 return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; 4621 } 4622 4623 nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", 4624 fds[0].fd, fds[1].fd, nevents, fds[0].revents, 4625 fds[1].revents); 4626 4627 if ((fds[0].revents & POLLIN) != 0) { 4628 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4629 if (res == NXT_UNIT_AGAIN) { 4630 goto retry; 4631 } 4632 4633 return res; 4634 } 4635 4636 if ((fds[1].revents & POLLIN) != 0) { 4637 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); 4638 if (res == NXT_UNIT_AGAIN) { 4639 goto retry; 4640 } 4641 4642 return res; 4643 } 4644 4645 nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]", 4646 fds[0].fd, fds[1].fd, nevents, fds[0].revents, 4647 fds[1].revents); 4648 4649 return NXT_UNIT_ERROR; 4650 } 4651 4652 4653 static int 4654 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) 4655 { 4656 int rc; 4657 nxt_queue_t pending_rbuf; 4658 nxt_unit_ctx_impl_t *ctx_impl; 4659 nxt_unit_read_buf_t *rbuf; 4660 4661 nxt_queue_init(&pending_rbuf); 4662 4663 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4664 4665 pthread_mutex_lock(&ctx_impl->mutex); 4666 4667 if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { 4668 pthread_mutex_unlock(&ctx_impl->mutex); 4669 4670 return NXT_UNIT_OK; 4671 } 4672 4673 nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf); 4674 nxt_queue_init(&ctx_impl->pending_rbuf); 4675 4676 pthread_mutex_unlock(&ctx_impl->mutex); 4677 4678 rc = NXT_UNIT_OK; 4679 4680 nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { 4681 4682 if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { 4683 rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf); 4684 4685 } else { 4686 nxt_unit_read_buf_release(ctx, rbuf); 4687 } 4688 4689 } nxt_queue_loop; 4690 4691 return rc; 4692 } 4693 4694 4695 static void 4696 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) 4697 { 4698 int res; 4699 nxt_queue_t ready_req; 4700 nxt_unit_impl_t *lib; 4701 nxt_unit_ctx_impl_t *ctx_impl; 4702 nxt_unit_request_info_t *req; 4703 nxt_unit_request_info_impl_t *req_impl; 4704 4705 nxt_queue_init(&ready_req); 4706 4707 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4708 4709 pthread_mutex_lock(&ctx_impl->mutex); 4710 4711 if (nxt_queue_is_empty(&ctx_impl->ready_req)) { 4712 pthread_mutex_unlock(&ctx_impl->mutex); 4713 4714 return; 4715 } 4716 4717 nxt_queue_add(&ready_req, &ctx_impl->ready_req); 4718 nxt_queue_init(&ctx_impl->ready_req); 4719 4720 pthread_mutex_unlock(&ctx_impl->mutex); 4721 4722 nxt_queue_each(req_impl, &ready_req, 4723 nxt_unit_request_info_impl_t, port_wait_link) 4724 { 4725 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 4726 4727 req = &req_impl->req; 4728 4729 res = nxt_unit_send_req_headers_ack(req); 4730 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4731 nxt_unit_request_done(req, NXT_UNIT_ERROR); 4732 4733 continue; 4734 } 4735 4736 if (req->content_length 4737 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 4738 { 4739 res = nxt_unit_request_hash_add(ctx, req); 4740 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4741 nxt_unit_req_warn(req, "failed to add request to hash"); 4742 4743 nxt_unit_request_done(req, NXT_UNIT_ERROR); 4744 4745 continue; 4746 } 4747 4748 /* 4749 * If application have separate data handler, we may start 4750 * request processing and process data when it is arrived. 4751 */ 4752 if (lib->callbacks.data_handler == NULL) { 4753 continue; 4754 } 4755 } 4756 4757 lib->callbacks.request_handler(&req_impl->req); 4758 4759 } nxt_queue_loop; 4760 } 4761 4762 4763 int 4764 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) 4765 { 4766 int rc; 4767 nxt_unit_read_buf_t *rbuf; 4768 nxt_unit_ctx_impl_t *ctx_impl; 4769 4770 nxt_unit_ctx_use(ctx); 4771 4772 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4773 4774 rc = NXT_UNIT_OK; 4775 4776 while (nxt_fast_path(ctx_impl->online)) { 4777 rbuf = nxt_unit_read_buf_get(ctx); 4778 if (nxt_slow_path(rbuf == NULL)) { 4779 rc = NXT_UNIT_ERROR; 4780 break; 4781 } 4782 4783 retry: 4784 4785 rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4786 if (rc == NXT_UNIT_AGAIN) { 4787 goto retry; 4788 } 4789 4790 rc = nxt_unit_process_msg(ctx, rbuf); 4791 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4792 break; 4793 } 4794 4795 rc = nxt_unit_process_pending_rbuf(ctx); 4796 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4797 break; 4798 } 4799 4800 nxt_unit_process_ready_req(ctx); 4801 } 4802 4803 nxt_unit_ctx_release(ctx); 4804 4805 return rc; 4806 } 4807 4808 4809 nxt_inline int 4810 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf) 4811 { 4812 nxt_port_msg_t *port_msg; 4813 4814 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4815 port_msg = (nxt_port_msg_t *) rbuf->buf; 4816 4817 return port_msg->type == _NXT_PORT_MSG_READ_QUEUE; 4818 } 4819 4820 return 0; 4821 } 4822 4823 4824 nxt_inline int 4825 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf) 4826 { 4827 if (nxt_fast_path(rbuf->size == 1)) { 4828 return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET; 4829 } 4830 4831 return 0; 4832 } 4833 4834 4835 nxt_inline int 4836 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf) 4837 { 4838 nxt_port_msg_t *port_msg; 4839 4840 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4841 port_msg = (nxt_port_msg_t *) rbuf->buf; 4842 4843 return port_msg->type == _NXT_PORT_MSG_SHM_ACK; 4844 } 4845 4846 return 0; 4847 } 4848 4849 4850 nxt_inline int 4851 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf) 4852 { 4853 nxt_port_msg_t *port_msg; 4854 4855 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4856 port_msg = (nxt_port_msg_t *) rbuf->buf; 4857 4858 return port_msg->type == _NXT_PORT_MSG_QUIT; 4859 } 4860 4861 return 0; 4862 } 4863 4864 4865 int 4866 nxt_unit_run_shared(nxt_unit_ctx_t *ctx) 4867 { 4868 int rc; 4869 nxt_unit_impl_t *lib; 4870 nxt_unit_read_buf_t *rbuf; 4871 nxt_unit_ctx_impl_t *ctx_impl; 4872 4873 nxt_unit_ctx_use(ctx); 4874 4875 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4876 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4877 4878 rc = NXT_UNIT_OK; 4879 4880 while (nxt_fast_path(ctx_impl->online)) { 4881 rbuf = nxt_unit_read_buf_get(ctx); 4882 if (nxt_slow_path(rbuf == NULL)) { 4883 rc = NXT_UNIT_ERROR; 4884 break; 4885 } 4886 4887 retry: 4888 4889 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); 4890 if (rc == NXT_UNIT_AGAIN) { 4891 goto retry; 4892 } 4893 4894 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4895 nxt_unit_read_buf_release(ctx, rbuf); 4896 break; 4897 } 4898 4899 rc = nxt_unit_process_msg(ctx, rbuf); 4900 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4901 break; 4902 } 4903 4904 rc = nxt_unit_process_pending_rbuf(ctx); 4905 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4906 break; 4907 } 4908 4909 nxt_unit_process_ready_req(ctx); 4910 } 4911 4912 nxt_unit_ctx_release(ctx); 4913 4914 return rc; 4915 } 4916 4917 4918 int 4919 nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) 4920 { 4921 nxt_unit_impl_t *lib; 4922 4923 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4924 4925 return (ctx == &lib->main_ctx.ctx); 4926 } 4927 4928 4929 int 4930 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 4931 { 4932 int rc; 4933 4934 nxt_unit_ctx_use(ctx); 4935 4936 rc = nxt_unit_process_port_msg_impl(ctx, port); 4937 4938 nxt_unit_ctx_release(ctx); 4939 4940 return rc; 4941 } 4942 4943 4944 static int 4945 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 4946 { 4947 int rc; 4948 nxt_unit_impl_t *lib; 4949 nxt_unit_read_buf_t *rbuf; 4950 nxt_unit_ctx_impl_t *ctx_impl; 4951 4952 rbuf = nxt_unit_read_buf_get(ctx); 4953 if (nxt_slow_path(rbuf == NULL)) { 4954 return NXT_UNIT_ERROR; 4955 } 4956 4957 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4958 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4959 4960 retry: 4961 4962 if (port == lib->shared_port) { 4963 rc = nxt_unit_shared_port_recv(ctx, port, rbuf); 4964 4965 } else { 4966 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf); 4967 } 4968 4969 if (rc != NXT_UNIT_OK) { 4970 nxt_unit_read_buf_release(ctx, rbuf); 4971 return rc; 4972 } 4973 4974 rc = nxt_unit_process_msg(ctx, rbuf); 4975 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4976 return NXT_UNIT_ERROR; 4977 } 4978 4979 rc = nxt_unit_process_pending_rbuf(ctx); 4980 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4981 return NXT_UNIT_ERROR; 4982 } 4983 4984 nxt_unit_process_ready_req(ctx); 4985 4986 rbuf = nxt_unit_read_buf_get(ctx); 4987 if (nxt_slow_path(rbuf == NULL)) { 4988 return NXT_UNIT_ERROR; 4989 } 4990 4991 if (ctx_impl->online) { 4992 goto retry; 4993 } 4994 4995 return rc; 4996 } 4997 4998 4999 void 5000 nxt_unit_done(nxt_unit_ctx_t *ctx) 5001 { 5002 nxt_unit_ctx_release(ctx); 5003 } 5004 5005 5006 nxt_unit_ctx_t * 5007 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) 5008 { 5009 int rc, queue_fd; 5010 void *mem; 5011 nxt_unit_impl_t *lib; 5012 nxt_unit_port_t *port; 5013 nxt_unit_ctx_impl_t *new_ctx; 5014 nxt_unit_port_impl_t *port_impl; 5015 5016 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5017 5018 new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t) 5019 + lib->request_data_size); 5020 if (nxt_slow_path(new_ctx == NULL)) { 5021 nxt_unit_alert(ctx, "failed to allocate context"); 5022 5023 return NULL; 5024 } 5025 5026 rc = nxt_unit_ctx_init(lib, new_ctx, data); 5027 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5028 nxt_unit_free(ctx, new_ctx); 5029 5030 return NULL; 5031 } 5032 5033 queue_fd = -1; 5034 5035 port = nxt_unit_create_port(&new_ctx->ctx); 5036 if (nxt_slow_path(port == NULL)) { 5037 goto fail; 5038 } 5039 5040 new_ctx->read_port = port; 5041 5042 queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t)); 5043 if (nxt_slow_path(queue_fd == -1)) { 5044 goto fail; 5045 } 5046 5047 mem = mmap(NULL, sizeof(nxt_port_queue_t), 5048 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 5049 if (nxt_slow_path(mem == MAP_FAILED)) { 5050 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 5051 strerror(errno), errno); 5052 5053 goto fail; 5054 } 5055 5056 nxt_port_queue_init(mem); 5057 5058 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5059 port_impl->queue = mem; 5060 5061 rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd); 5062 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5063 goto fail; 5064 } 5065 5066 nxt_unit_close(queue_fd); 5067 5068 return &new_ctx->ctx; 5069 5070 fail: 5071 5072 if (queue_fd != -1) { 5073 nxt_unit_close(queue_fd); 5074 } 5075 5076 nxt_unit_ctx_release(&new_ctx->ctx); 5077 5078 return NULL; 5079 } 5080 5081 5082 static void 5083 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) 5084 { 5085 nxt_unit_impl_t *lib; 5086 nxt_unit_mmap_buf_t *mmap_buf; 5087 nxt_unit_read_buf_t *rbuf; 5088 nxt_unit_request_info_impl_t *req_impl; 5089 nxt_unit_websocket_frame_impl_t *ws_impl; 5090 5091 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 5092 5093 nxt_queue_each(req_impl, &ctx_impl->active_req, 5094 nxt_unit_request_info_impl_t, link) 5095 { 5096 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 5097 5098 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 5099 5100 } nxt_queue_loop; 5101 5102 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]); 5103 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]); 5104 5105 while (ctx_impl->free_buf != NULL) { 5106 mmap_buf = ctx_impl->free_buf; 5107 nxt_unit_mmap_buf_unlink(mmap_buf); 5108 nxt_unit_free(&ctx_impl->ctx, mmap_buf); 5109 } 5110 5111 nxt_queue_each(req_impl, &ctx_impl->free_req, 5112 nxt_unit_request_info_impl_t, link) 5113 { 5114 nxt_unit_request_info_free(req_impl); 5115 5116 } nxt_queue_loop; 5117 5118 nxt_queue_each(ws_impl, &ctx_impl->free_ws, 5119 nxt_unit_websocket_frame_impl_t, link) 5120 { 5121 nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl); 5122 5123 } nxt_queue_loop; 5124 5125 nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link) 5126 { 5127 if (rbuf != &ctx_impl->ctx_read_buf) { 5128 nxt_unit_free(&ctx_impl->ctx, rbuf); 5129 } 5130 } nxt_queue_loop; 5131 5132 pthread_mutex_destroy(&ctx_impl->mutex); 5133 5134 pthread_mutex_lock(&lib->mutex); 5135 5136 nxt_queue_remove(&ctx_impl->link); 5137 5138 pthread_mutex_unlock(&lib->mutex); 5139 5140 if (nxt_fast_path(ctx_impl->read_port != NULL)) { 5141 nxt_unit_remove_port(lib, &ctx_impl->read_port->id); 5142 nxt_unit_port_release(ctx_impl->read_port); 5143 } 5144 5145 if (ctx_impl != &lib->main_ctx) { 5146 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl); 5147 } 5148 5149 nxt_unit_lib_release(lib); 5150 } 5151 5152 5153 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */ 5154 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET) 5155 #define NXT_UNIX_SOCKET SOCK_SEQPACKET 5156 #else 5157 #define NXT_UNIX_SOCKET SOCK_DGRAM 5158 #endif 5159 5160 5161 void 5162 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) 5163 { 5164 nxt_unit_port_hash_id_t port_hash_id; 5165 5166 port_hash_id.pid = pid; 5167 port_hash_id.id = id; 5168 5169 port_id->pid = pid; 5170 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id)); 5171 port_id->id = id; 5172 } 5173 5174 5175 static nxt_unit_port_t * 5176 nxt_unit_create_port(nxt_unit_ctx_t *ctx) 5177 { 5178 int rc, port_sockets[2]; 5179 nxt_unit_impl_t *lib; 5180 nxt_unit_port_t new_port, *port; 5181 nxt_unit_process_t *process; 5182 5183 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5184 5185 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets); 5186 if (nxt_slow_path(rc != 0)) { 5187 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", 5188 strerror(errno), errno); 5189 5190 return NULL; 5191 } 5192 5193 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", 5194 port_sockets[0], port_sockets[1]); 5195 5196 pthread_mutex_lock(&lib->mutex); 5197 5198 process = nxt_unit_process_get(ctx, lib->pid); 5199 if (nxt_slow_path(process == NULL)) { 5200 pthread_mutex_unlock(&lib->mutex); 5201 5202 nxt_unit_close(port_sockets[0]); 5203 nxt_unit_close(port_sockets[1]); 5204 5205 return NULL; 5206 } 5207 5208 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); 5209 5210 new_port.in_fd = port_sockets[0]; 5211 new_port.out_fd = port_sockets[1]; 5212 new_port.data = NULL; 5213 5214 pthread_mutex_unlock(&lib->mutex); 5215 5216 nxt_unit_process_release(process); 5217 5218 port = nxt_unit_add_port(ctx, &new_port, NULL); 5219 if (nxt_slow_path(port == NULL)) { 5220 nxt_unit_close(port_sockets[0]); 5221 nxt_unit_close(port_sockets[1]); 5222 } 5223 5224 return port; 5225 } 5226 5227 5228 static int 5229 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 5230 nxt_unit_port_t *port, int queue_fd) 5231 { 5232 ssize_t res; 5233 nxt_unit_impl_t *lib; 5234 int fds[2] = { port->out_fd, queue_fd }; 5235 5236 struct { 5237 nxt_port_msg_t msg; 5238 nxt_port_msg_new_port_t new_port; 5239 } m; 5240 5241 union { 5242 struct cmsghdr cm; 5243 char space[CMSG_SPACE(sizeof(int) * 2)]; 5244 } cmsg; 5245 5246 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5247 5248 m.msg.stream = 0; 5249 m.msg.pid = lib->pid; 5250 m.msg.reply_port = 0; 5251 m.msg.type = _NXT_PORT_MSG_NEW_PORT; 5252 m.msg.last = 0; 5253 m.msg.mmap = 0; 5254 m.msg.nf = 0; 5255 m.msg.mf = 0; 5256 m.msg.tracking = 0; 5257 5258 m.new_port.id = port->id.id; 5259 m.new_port.pid = port->id.pid; 5260 m.new_port.type = NXT_PROCESS_APP; 5261 m.new_port.max_size = 16 * 1024; 5262 m.new_port.max_share = 64 * 1024; 5263 5264 memset(&cmsg, 0, sizeof(cmsg)); 5265 5266 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); 5267 cmsg.cm.cmsg_level = SOL_SOCKET; 5268 cmsg.cm.cmsg_type = SCM_RIGHTS; 5269 5270 /* 5271 * memcpy() is used instead of simple 5272 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 5273 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 5274 * dereferencing type-punned pointer will break strict-aliasing rules 5275 * 5276 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 5277 * in the same simple assignment as in the code above. 5278 */ 5279 memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); 5280 5281 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); 5282 5283 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; 5284 } 5285 5286 5287 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port) 5288 { 5289 nxt_unit_port_impl_t *port_impl; 5290 5291 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5292 5293 nxt_atomic_fetch_add(&port_impl->use_count, 1); 5294 } 5295 5296 5297 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) 5298 { 5299 long c; 5300 nxt_unit_port_impl_t *port_impl; 5301 5302 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5303 5304 c = nxt_atomic_fetch_add(&port_impl->use_count, -1); 5305 5306 if (c == 1) { 5307 nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d", 5308 (int) port->id.pid, (int) port->id.id, 5309 port->in_fd, port->out_fd); 5310 5311 nxt_unit_process_release(port_impl->process); 5312 5313 if (port->in_fd != -1) { 5314 nxt_unit_close(port->in_fd); 5315 5316 port->in_fd = -1; 5317 } 5318 5319 if (port->out_fd != -1) { 5320 nxt_unit_close(port->out_fd); 5321 5322 port->out_fd = -1; 5323 } 5324 5325 if (port_impl->queue != NULL) { 5326 munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID) 5327 ? sizeof(nxt_app_queue_t) 5328 : sizeof(nxt_port_queue_t)); 5329 } 5330 5331 nxt_unit_free(NULL, port_impl); 5332 } 5333 } 5334 5335 5336 static nxt_unit_port_t * 5337 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) 5338 { 5339 int rc; 5340 nxt_queue_t awaiting_req; 5341 nxt_unit_impl_t *lib; 5342 nxt_unit_port_t *old_port; 5343 nxt_unit_process_t *process; 5344 nxt_unit_ctx_impl_t *ctx_impl; 5345 nxt_unit_port_impl_t *new_port, *old_port_impl; 5346 nxt_unit_request_info_impl_t *req_impl; 5347 5348 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5349 5350 pthread_mutex_lock(&lib->mutex); 5351 5352 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); 5353 5354 if (nxt_slow_path(old_port != NULL)) { 5355 nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} " 5356 "in_fd %d out_fd %d queue %p", 5357 port->id.pid, port->id.id, 5358 port->in_fd, port->out_fd, queue); 5359 5360 if (old_port->data == NULL) { 5361 old_port->data = port->data; 5362 port->data = NULL; 5363 } 5364 5365 if (old_port->in_fd == -1) { 5366 old_port->in_fd = port->in_fd; 5367 port->in_fd = -1; 5368 } 5369 5370 if (port->in_fd != -1) { 5371 nxt_unit_close(port->in_fd); 5372 port->in_fd = -1; 5373 } 5374 5375 if (old_port->out_fd == -1) { 5376 old_port->out_fd = port->out_fd; 5377 port->out_fd = -1; 5378 } 5379 5380 if (port->out_fd != -1) { 5381 nxt_unit_close(port->out_fd); 5382 port->out_fd = -1; 5383 } 5384 5385 *port = *old_port; 5386 5387 nxt_queue_init(&awaiting_req); 5388 5389 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); 5390 5391 if (old_port_impl->queue == NULL) { 5392 old_port_impl->queue = queue; 5393 } 5394 5395 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { 5396 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); 5397 nxt_queue_init(&old_port_impl->awaiting_req); 5398 } 5399 5400 old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1); 5401 5402 pthread_mutex_unlock(&lib->mutex); 5403 5404 if (lib->callbacks.add_port != NULL 5405 && (port->in_fd != -1 || port->out_fd != -1)) 5406 { 5407 lib->callbacks.add_port(ctx, old_port); 5408 } 5409 5410 nxt_queue_each(req_impl, &awaiting_req, 5411 nxt_unit_request_info_impl_t, port_wait_link) 5412 { 5413 nxt_queue_remove(&req_impl->port_wait_link); 5414 5415 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, 5416 ctx); 5417 5418 pthread_mutex_lock(&ctx_impl->mutex); 5419 5420 nxt_queue_insert_tail(&ctx_impl->ready_req, 5421 &req_impl->port_wait_link); 5422 5423 pthread_mutex_unlock(&ctx_impl->mutex); 5424 5425 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); 5426 5427 nxt_unit_awake_ctx(ctx, ctx_impl); 5428 5429 } nxt_queue_loop; 5430 5431 return old_port; 5432 } 5433 5434 new_port = NULL; 5435 5436 nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", 5437 port->id.pid, port->id.id, 5438 port->in_fd, port->out_fd, queue); 5439 5440 process = nxt_unit_process_get(ctx, port->id.pid); 5441 if (nxt_slow_path(process == NULL)) { 5442 goto unlock; 5443 } 5444 5445 if (port->id.id != NXT_UNIT_SHARED_PORT_ID 5446 && port->id.id >= process->next_port_id) 5447 { 5448 process->next_port_id = port->id.id + 1; 5449 } 5450 5451 new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 5452 if (nxt_slow_path(new_port == NULL)) { 5453 nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", 5454 port->id.pid, port->id.id); 5455 5456 goto unlock; 5457 } 5458 5459 new_port->port = *port; 5460 5461 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); 5462 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5463 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", 5464 port->id.pid, port->id.id); 5465 5466 nxt_unit_free(ctx, new_port); 5467 5468 new_port = NULL; 5469 5470 goto unlock; 5471 } 5472 5473 nxt_queue_insert_tail(&process->ports, &new_port->link); 5474 5475 new_port->use_count = 2; 5476 new_port->process = process; 5477 new_port->ready = (port->in_fd != -1 || port->out_fd != -1); 5478 new_port->queue = queue; 5479 new_port->from_socket = 0; 5480 new_port->socket_rbuf = NULL; 5481 5482 nxt_queue_init(&new_port->awaiting_req); 5483 5484 process = NULL; 5485 5486 unlock: 5487 5488 pthread_mutex_unlock(&lib->mutex); 5489 5490 if (nxt_slow_path(process != NULL)) { 5491 nxt_unit_process_release(process); 5492 } 5493 5494 if (lib->callbacks.add_port != NULL 5495 && new_port != NULL 5496 && (port->in_fd != -1 || port->out_fd != -1)) 5497 { 5498 lib->callbacks.add_port(ctx, &new_port->port); 5499 } 5500 5501 return &new_port->port; 5502 } 5503 5504 5505 static void 5506 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 5507 { 5508 nxt_unit_port_t *port; 5509 nxt_unit_port_impl_t *port_impl; 5510 5511 pthread_mutex_lock(&lib->mutex); 5512 5513 port = nxt_unit_remove_port_unsafe(lib, port_id); 5514 5515 if (nxt_fast_path(port != NULL)) { 5516 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5517 5518 nxt_queue_remove(&port_impl->link); 5519 } 5520 5521 pthread_mutex_unlock(&lib->mutex); 5522 5523 if (lib->callbacks.remove_port != NULL && port != NULL) { 5524 lib->callbacks.remove_port(&lib->unit, port); 5525 } 5526 5527 if (nxt_fast_path(port != NULL)) { 5528 nxt_unit_port_release(port); 5529 } 5530 } 5531 5532 5533 static nxt_unit_port_t * 5534 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 5535 { 5536 nxt_unit_port_t *port; 5537 5538 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); 5539 if (nxt_slow_path(port == NULL)) { 5540 nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found", 5541 (int) port_id->pid, (int) port_id->id); 5542 5543 return NULL; 5544 } 5545 5546 nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p", 5547 (int) port_id->pid, (int) port_id->id, 5548 port->in_fd, port->out_fd, port->data); 5549 5550 return port; 5551 } 5552 5553 5554 static void 5555 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) 5556 { 5557 nxt_unit_process_t *process; 5558 5559 pthread_mutex_lock(&lib->mutex); 5560 5561 process = nxt_unit_process_find(lib, pid, 1); 5562 if (nxt_slow_path(process == NULL)) { 5563 nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid); 5564 5565 pthread_mutex_unlock(&lib->mutex); 5566 5567 return; 5568 } 5569 5570 nxt_unit_remove_process(lib, process); 5571 5572 if (lib->callbacks.remove_pid != NULL) { 5573 lib->callbacks.remove_pid(&lib->unit, pid); 5574 } 5575 } 5576 5577 5578 static void 5579 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) 5580 { 5581 nxt_queue_t ports; 5582 nxt_unit_port_impl_t *port; 5583 5584 nxt_queue_init(&ports); 5585 5586 nxt_queue_add(&ports, &process->ports); 5587 5588 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 5589 5590 nxt_unit_remove_port_unsafe(lib, &port->port.id); 5591 5592 } nxt_queue_loop; 5593 5594 pthread_mutex_unlock(&lib->mutex); 5595 5596 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 5597 5598 nxt_queue_remove(&port->link); 5599 5600 if (lib->callbacks.remove_port != NULL) { 5601 lib->callbacks.remove_port(&lib->unit, &port->port); 5602 } 5603 5604 nxt_unit_port_release(&port->port); 5605 5606 } nxt_queue_loop; 5607 5608 nxt_unit_process_release(process); 5609 } 5610 5611 5612 static void 5613 nxt_unit_quit(nxt_unit_ctx_t *ctx) 5614 { 5615 nxt_port_msg_t msg; 5616 nxt_unit_impl_t *lib; 5617 nxt_unit_ctx_impl_t *ctx_impl; 5618 5619 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5620 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5621 5622 if (!ctx_impl->online) { 5623 return; 5624 } 5625 5626 ctx_impl->online = 0; 5627 5628 if (lib->callbacks.quit != NULL) { 5629 lib->callbacks.quit(ctx); 5630 } 5631 5632 if (ctx != &lib->main_ctx.ctx) { 5633 return; 5634 } 5635 5636 memset(&msg, 0, sizeof(nxt_port_msg_t)); 5637 5638 msg.pid = lib->pid; 5639 msg.type = _NXT_PORT_MSG_QUIT; 5640 5641 pthread_mutex_lock(&lib->mutex); 5642 5643 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { 5644 5645 if (ctx == &ctx_impl->ctx 5646 || ctx_impl->read_port == NULL 5647 || ctx_impl->read_port->out_fd == -1) 5648 { 5649 continue; 5650 } 5651 5652 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, 5653 &msg, sizeof(msg), NULL, 0); 5654 5655 } nxt_queue_loop; 5656 5657 pthread_mutex_unlock(&lib->mutex); 5658 } 5659 5660 5661 static int 5662 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 5663 { 5664 ssize_t res; 5665 nxt_unit_impl_t *lib; 5666 nxt_unit_ctx_impl_t *ctx_impl; 5667 5668 struct { 5669 nxt_port_msg_t msg; 5670 nxt_port_msg_get_port_t get_port; 5671 } m; 5672 5673 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5674 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5675 5676 memset(&m.msg, 0, sizeof(nxt_port_msg_t)); 5677 5678 m.msg.pid = lib->pid; 5679 m.msg.reply_port = ctx_impl->read_port->id.id; 5680 m.msg.type = _NXT_PORT_MSG_GET_PORT; 5681 5682 m.get_port.id = port_id->id; 5683 m.get_port.pid = port_id->pid; 5684 5685 nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, 5686 (int) port_id->id); 5687 5688 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); 5689 if (nxt_slow_path(res != sizeof(m))) { 5690 return NXT_UNIT_ERROR; 5691 } 5692 5693 return NXT_UNIT_OK; 5694 } 5695 5696 5697 static ssize_t 5698 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5699 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 5700 { 5701 int notify; 5702 ssize_t ret; 5703 nxt_int_t rc; 5704 nxt_port_msg_t msg; 5705 nxt_unit_impl_t *lib; 5706 nxt_unit_port_impl_t *port_impl; 5707 5708 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5709 5710 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5711 if (port_impl->queue != NULL && oob_size == 0 5712 && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) 5713 { 5714 rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); 5715 if (nxt_slow_path(rc != NXT_OK)) { 5716 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", 5717 (int) port->id.pid, (int) port->id.id); 5718 5719 return -1; 5720 } 5721 5722 nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d", 5723 (int) port->id.pid, (int) port->id.id, 5724 (int) buf_size, notify); 5725 5726 if (notify) { 5727 memcpy(&msg, buf, sizeof(nxt_port_msg_t)); 5728 5729 msg.type = _NXT_PORT_MSG_READ_QUEUE; 5730 5731 if (lib->callbacks.port_send == NULL) { 5732 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, 5733 sizeof(nxt_port_msg_t), NULL, 0); 5734 5735 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", 5736 (int) port->id.pid, (int) port->id.id, 5737 (int) ret); 5738 5739 } else { 5740 ret = lib->callbacks.port_send(ctx, port, &msg, 5741 sizeof(nxt_port_msg_t), NULL, 0); 5742 5743 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue", 5744 (int) port->id.pid, (int) port->id.id, 5745 (int) ret); 5746 } 5747 5748 } 5749 5750 return buf_size; 5751 } 5752 5753 if (port_impl->queue != NULL) { 5754 msg.type = _NXT_PORT_MSG_READ_SOCKET; 5755 5756 rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify); 5757 if (nxt_slow_path(rc != NXT_OK)) { 5758 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", 5759 (int) port->id.pid, (int) port->id.id); 5760 5761 return -1; 5762 } 5763 5764 nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d", 5765 (int) port->id.pid, (int) port->id.id, notify); 5766 } 5767 5768 if (lib->callbacks.port_send != NULL) { 5769 ret = lib->callbacks.port_send(ctx, port, buf, buf_size, 5770 oob, oob_size); 5771 5772 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", 5773 (int) port->id.pid, (int) port->id.id, 5774 (int) ret); 5775 5776 } else { 5777 ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, 5778 oob, oob_size); 5779 5780 nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", 5781 (int) port->id.pid, (int) port->id.id, 5782 (int) ret); 5783 } 5784 5785 return ret; 5786 } 5787 5788 5789 static ssize_t 5790 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 5791 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 5792 { 5793 int err; 5794 ssize_t res; 5795 struct iovec iov[1]; 5796 struct msghdr msg; 5797 5798 iov[0].iov_base = (void *) buf; 5799 iov[0].iov_len = buf_size; 5800 5801 msg.msg_name = NULL; 5802 msg.msg_namelen = 0; 5803 msg.msg_iov = iov; 5804 msg.msg_iovlen = 1; 5805 msg.msg_flags = 0; 5806 msg.msg_control = (void *) oob; 5807 msg.msg_controllen = oob_size; 5808 5809 retry: 5810 5811 res = sendmsg(fd, &msg, 0); 5812 5813 if (nxt_slow_path(res == -1)) { 5814 err = errno; 5815 5816 if (err == EINTR) { 5817 goto retry; 5818 } 5819 5820 /* 5821 * FIXME: This should be "alert" after router graceful shutdown 5822 * implementation. 5823 */ 5824 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", 5825 fd, (int) buf_size, strerror(err), err); 5826 5827 } else { 5828 nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, 5829 (int) res); 5830 } 5831 5832 return res; 5833 } 5834 5835 5836 static int 5837 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5838 nxt_unit_read_buf_t *rbuf) 5839 { 5840 int res, read; 5841 nxt_unit_port_impl_t *port_impl; 5842 5843 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5844 5845 read = 0; 5846 5847 retry: 5848 5849 if (port_impl->from_socket > 0) { 5850 if (port_impl->socket_rbuf != NULL 5851 && port_impl->socket_rbuf->size > 0) 5852 { 5853 port_impl->from_socket--; 5854 5855 nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf); 5856 port_impl->socket_rbuf->size = 0; 5857 5858 nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d", 5859 (int) port->id.pid, (int) port->id.id, 5860 (int) rbuf->size); 5861 5862 return NXT_UNIT_OK; 5863 } 5864 5865 } else { 5866 res = nxt_unit_port_queue_recv(port, rbuf); 5867 5868 if (res == NXT_UNIT_OK) { 5869 if (nxt_unit_is_read_socket(rbuf)) { 5870 port_impl->from_socket++; 5871 5872 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", 5873 (int) port->id.pid, (int) port->id.id, 5874 port_impl->from_socket); 5875 5876 goto retry; 5877 } 5878 5879 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", 5880 (int) port->id.pid, (int) port->id.id, 5881 (int) rbuf->size); 5882 5883 return NXT_UNIT_OK; 5884 } 5885 } 5886 5887 if (read) { 5888 return NXT_UNIT_AGAIN; 5889 } 5890 5891 res = nxt_unit_port_recv(ctx, port, rbuf); 5892 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 5893 return NXT_UNIT_ERROR; 5894 } 5895 5896 read = 1; 5897 5898 if (nxt_unit_is_read_queue(rbuf)) { 5899 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", 5900 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 5901 5902 goto retry; 5903 } 5904 5905 nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d", 5906 (int) port->id.pid, (int) port->id.id, 5907 (int) rbuf->size); 5908 5909 if (res == NXT_UNIT_AGAIN) { 5910 return NXT_UNIT_AGAIN; 5911 } 5912 5913 if (port_impl->from_socket > 0) { 5914 port_impl->from_socket--; 5915 5916 return NXT_UNIT_OK; 5917 } 5918 5919 nxt_unit_debug(ctx, "port{%d,%d} suspend message %d", 5920 (int) port->id.pid, (int) port->id.id, 5921 (int) rbuf->size); 5922 5923 if (port_impl->socket_rbuf == NULL) { 5924 port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx); 5925 5926 if (nxt_slow_path(port_impl->socket_rbuf == NULL)) { 5927 return NXT_UNIT_ERROR; 5928 } 5929 5930 port_impl->socket_rbuf->size = 0; 5931 } 5932 5933 if (port_impl->socket_rbuf->size > 0) { 5934 nxt_unit_alert(ctx, "too many port socket messages"); 5935 5936 return NXT_UNIT_ERROR; 5937 } 5938 5939 nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); 5940 5941 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 5942 5943 goto retry; 5944 } 5945 5946 5947 nxt_inline void 5948 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) 5949 { 5950 memcpy(dst->buf, src->buf, src->size); 5951 dst->size = src->size; 5952 memcpy(dst->oob, src->oob, sizeof(src->oob)); 5953 } 5954 5955 5956 static int 5957 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5958 nxt_unit_read_buf_t *rbuf) 5959 { 5960 int res; 5961 5962 retry: 5963 5964 res = nxt_unit_app_queue_recv(port, rbuf); 5965 5966 if (res == NXT_UNIT_AGAIN) { 5967 res = nxt_unit_port_recv(ctx, port, rbuf); 5968 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 5969 return NXT_UNIT_ERROR; 5970 } 5971 5972 if (nxt_unit_is_read_queue(rbuf)) { 5973 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", 5974 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 5975 5976 goto retry; 5977 } 5978 } 5979 5980 return res; 5981 } 5982 5983 5984 static int 5985 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5986 nxt_unit_read_buf_t *rbuf) 5987 { 5988 int fd, err; 5989 struct iovec iov[1]; 5990 struct msghdr msg; 5991 nxt_unit_impl_t *lib; 5992 5993 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5994 5995 if (lib->callbacks.port_recv != NULL) { 5996 rbuf->size = lib->callbacks.port_recv(ctx, port, 5997 rbuf->buf, sizeof(rbuf->buf), 5998 rbuf->oob, sizeof(rbuf->oob)); 5999 6000 nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", 6001 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 6002 6003 if (nxt_slow_path(rbuf->size < 0)) { 6004 return NXT_UNIT_ERROR; 6005 } 6006 6007 return NXT_UNIT_OK; 6008 } 6009 6010 iov[0].iov_base = rbuf->buf; 6011 iov[0].iov_len = sizeof(rbuf->buf); 6012 6013 msg.msg_name = NULL; 6014 msg.msg_namelen = 0; 6015 msg.msg_iov = iov; 6016 msg.msg_iovlen = 1; 6017 msg.msg_flags = 0; 6018 msg.msg_control = rbuf->oob; 6019 msg.msg_controllen = sizeof(rbuf->oob); 6020 6021 fd = port->in_fd; 6022 6023 retry: 6024 6025 rbuf->size = recvmsg(fd, &msg, 0); 6026 6027 if (nxt_slow_path(rbuf->size == -1)) { 6028 err = errno; 6029 6030 if (err == EINTR) { 6031 goto retry; 6032 } 6033 6034 if (err == EAGAIN) { 6035 nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", 6036 fd, strerror(err), err); 6037 6038 return NXT_UNIT_AGAIN; 6039 } 6040 6041 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", 6042 fd, strerror(err), err); 6043 6044 return NXT_UNIT_ERROR; 6045 } 6046 6047 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size); 6048 6049 return NXT_UNIT_OK; 6050 } 6051 6052 6053 static int 6054 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) 6055 { 6056 nxt_unit_port_impl_t *port_impl; 6057 6058 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 6059 6060 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf); 6061 6062 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; 6063 } 6064 6065 6066 static int 6067 nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) 6068 { 6069 uint32_t cookie; 6070 nxt_port_msg_t *port_msg; 6071 nxt_app_queue_t *queue; 6072 nxt_unit_port_impl_t *port_impl; 6073 6074 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 6075 queue = port_impl->queue; 6076 6077 retry: 6078 6079 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie); 6080 6081 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size); 6082 6083 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) { 6084 port_msg = (nxt_port_msg_t *) rbuf->buf; 6085 6086 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { 6087 return NXT_UNIT_OK; 6088 } 6089 6090 nxt_unit_debug(NULL, "app_queue_recv: message cancelled"); 6091 6092 goto retry; 6093 } 6094 6095 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; 6096 } 6097 6098 6099 nxt_inline int 6100 nxt_unit_close(int fd) 6101 { 6102 int res; 6103 6104 res = close(fd); 6105 6106 if (nxt_slow_path(res == -1)) { 6107 nxt_unit_alert(NULL, "close(%d) failed: %s (%d)", 6108 fd, strerror(errno), errno); 6109 6110 } else { 6111 nxt_unit_debug(NULL, "close(%d): %d", fd, res); 6112 } 6113 6114 return res; 6115 } 6116 6117 6118 static int 6119 nxt_unit_fd_blocking(int fd) 6120 { 6121 int nb; 6122 6123 nb = 0; 6124 6125 if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) { 6126 nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 6127 fd, strerror(errno), errno); 6128 6129 return NXT_UNIT_ERROR; 6130 } 6131 6132 return NXT_UNIT_OK; 6133 } 6134 6135 6136 static nxt_int_t 6137 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 6138 { 6139 nxt_unit_port_t *port; 6140 nxt_unit_port_hash_id_t *port_id; 6141 6142 port = data; 6143 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start; 6144 6145 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t) 6146 && port_id->pid == port->id.pid 6147 && port_id->id == port->id.id) 6148 { 6149 return NXT_OK; 6150 } 6151 6152 return NXT_DECLINED; 6153 } 6154 6155 6156 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { 6157 NXT_LVLHSH_DEFAULT, 6158 nxt_unit_port_hash_test, 6159 nxt_unit_lvlhsh_alloc, 6160 nxt_unit_lvlhsh_free, 6161 }; 6162 6163 6164 static inline void 6165 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq, 6166 nxt_unit_port_hash_id_t *port_hash_id, 6167 nxt_unit_port_id_t *port_id) 6168 { 6169 port_hash_id->pid = port_id->pid; 6170 port_hash_id->id = port_id->id; 6171 6172 if (nxt_fast_path(port_id->hash != 0)) { 6173 lhq->key_hash = port_id->hash; 6174 6175 } else { 6176 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id)); 6177 6178 port_id->hash = lhq->key_hash; 6179 6180 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X", 6181 (int) port_id->pid, (int) port_id->id, 6182 (int) port_id->hash); 6183 } 6184 6185 lhq->key.length = sizeof(nxt_unit_port_hash_id_t); 6186 lhq->key.start = (u_char *) port_hash_id; 6187 lhq->proto = &lvlhsh_ports_proto; 6188 lhq->pool = NULL; 6189 } 6190 6191 6192 static int 6193 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) 6194 { 6195 nxt_int_t res; 6196 nxt_lvlhsh_query_t lhq; 6197 nxt_unit_port_hash_id_t port_hash_id; 6198 6199 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id); 6200 lhq.replace = 0; 6201 lhq.value = port; 6202 6203 res = nxt_lvlhsh_insert(port_hash, &lhq); 6204 6205 switch (res) { 6206 6207 case NXT_OK: 6208 return NXT_UNIT_OK; 6209 6210 default: 6211 return NXT_UNIT_ERROR; 6212 } 6213 } 6214 6215 6216 static nxt_unit_port_t * 6217 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, 6218 int remove) 6219 { 6220 nxt_int_t res; 6221 nxt_lvlhsh_query_t lhq; 6222 nxt_unit_port_hash_id_t port_hash_id; 6223 6224 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id); 6225 6226 if (remove) { 6227 res = nxt_lvlhsh_delete(port_hash, &lhq); 6228 6229 } else { 6230 res = nxt_lvlhsh_find(port_hash, &lhq); 6231 } 6232 6233 switch (res) { 6234 6235 case NXT_OK: 6236 if (!remove) { 6237 nxt_unit_port_use(lhq.value); 6238 } 6239 6240 return lhq.value; 6241 6242 default: 6243 return NULL; 6244 } 6245 } 6246 6247 6248 static nxt_int_t 6249 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 6250 { 6251 return NXT_OK; 6252 } 6253 6254 6255 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { 6256 NXT_LVLHSH_DEFAULT, 6257 nxt_unit_request_hash_test, 6258 nxt_unit_lvlhsh_alloc, 6259 nxt_unit_lvlhsh_free, 6260 }; 6261 6262 6263 static int 6264 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 6265 nxt_unit_request_info_t *req) 6266 { 6267 uint32_t *stream; 6268 nxt_int_t res; 6269 nxt_lvlhsh_query_t lhq; 6270 nxt_unit_ctx_impl_t *ctx_impl; 6271 nxt_unit_request_info_impl_t *req_impl; 6272 6273 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 6274 if (req_impl->in_hash) { 6275 return NXT_UNIT_OK; 6276 } 6277 6278 stream = &req_impl->stream; 6279 6280 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 6281 lhq.key.length = sizeof(*stream); 6282 lhq.key.start = (u_char *) stream; 6283 lhq.proto = &lvlhsh_requests_proto; 6284 lhq.pool = NULL; 6285 lhq.replace = 0; 6286 lhq.value = req_impl; 6287 6288 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 6289 6290 pthread_mutex_lock(&ctx_impl->mutex); 6291 6292 res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq); 6293 6294 pthread_mutex_unlock(&ctx_impl->mutex); 6295 6296 switch (res) { 6297 6298 case NXT_OK: 6299 req_impl->in_hash = 1; 6300 return NXT_UNIT_OK; 6301 6302 default: 6303 return NXT_UNIT_ERROR; 6304 } 6305 } 6306 6307 6308 static nxt_unit_request_info_t * 6309 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) 6310 { 6311 nxt_int_t res; 6312 nxt_lvlhsh_query_t lhq; 6313 nxt_unit_ctx_impl_t *ctx_impl; 6314 nxt_unit_request_info_impl_t *req_impl; 6315 6316 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); 6317 lhq.key.length = sizeof(stream); 6318 lhq.key.start = (u_char *) &stream; 6319 lhq.proto = &lvlhsh_requests_proto; 6320 lhq.pool = NULL; 6321 6322 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 6323 6324 pthread_mutex_lock(&ctx_impl->mutex); 6325 6326 if (remove) { 6327 res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq); 6328 6329 } else { 6330 res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq); 6331 } 6332 6333 pthread_mutex_unlock(&ctx_impl->mutex); 6334 6335 switch (res) { 6336 6337 case NXT_OK: 6338 req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t, 6339 req); 6340 if (remove) { 6341 req_impl->in_hash = 0; 6342 } 6343 6344 return lhq.value; 6345 6346 default: 6347 return NULL; 6348 } 6349 } 6350 6351 6352 void 6353 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 6354 { 6355 int log_fd, n; 6356 char msg[NXT_MAX_ERROR_STR], *p, *end; 6357 pid_t pid; 6358 va_list ap; 6359 nxt_unit_impl_t *lib; 6360 6361 if (nxt_fast_path(ctx != NULL)) { 6362 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 6363 6364 pid = lib->pid; 6365 log_fd = lib->log_fd; 6366 6367 } else { 6368 pid = getpid(); 6369 log_fd = STDERR_FILENO; 6370 } 6371 6372 p = msg; 6373 end = p + sizeof(msg) - 1; 6374 6375 p = nxt_unit_snprint_prefix(p, end, pid, level); 6376 6377 va_start(ap, fmt); 6378 p += vsnprintf(p, end - p, fmt, ap); 6379 va_end(ap); 6380 6381 if (nxt_slow_path(p > end)) { 6382 memcpy(end - 5, "[...]", 5); 6383 p = end; 6384 } 6385 6386 *p++ = '\n'; 6387 6388 n = write(log_fd, msg, p - msg); 6389 if (nxt_slow_path(n < 0)) { 6390 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 6391 } 6392 } 6393 6394 6395 void 6396 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) 6397 { 6398 int log_fd, n; 6399 char msg[NXT_MAX_ERROR_STR], *p, *end; 6400 pid_t pid; 6401 va_list ap; 6402 nxt_unit_impl_t *lib; 6403 nxt_unit_request_info_impl_t *req_impl; 6404 6405 if (nxt_fast_path(req != NULL)) { 6406 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 6407 6408 pid = lib->pid; 6409 log_fd = lib->log_fd; 6410 6411 } else { 6412 pid = getpid(); 6413 log_fd = STDERR_FILENO; 6414 } 6415 6416 p = msg; 6417 end = p + sizeof(msg) - 1; 6418 6419 p = nxt_unit_snprint_prefix(p, end, pid, level); 6420 6421 if (nxt_fast_path(req != NULL)) { 6422 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 6423 6424 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); 6425 } 6426 6427 va_start(ap, fmt); 6428 p += vsnprintf(p, end - p, fmt, ap); 6429 va_end(ap); 6430 6431 if (nxt_slow_path(p > end)) { 6432 memcpy(end - 5, "[...]", 5); 6433 p = end; 6434 } 6435 6436 *p++ = '\n'; 6437 6438 n = write(log_fd, msg, p - msg); 6439 if (nxt_slow_path(n < 0)) { 6440 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 6441 } 6442 } 6443 6444 6445 static const char * nxt_unit_log_levels[] = { 6446 "alert", 6447 "error", 6448 "warn", 6449 "notice", 6450 "info", 6451 "debug", 6452 }; 6453 6454 6455 static char * 6456 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) 6457 { 6458 struct tm tm; 6459 struct timespec ts; 6460 6461 (void) clock_gettime(CLOCK_REALTIME, &ts); 6462 6463 #if (NXT_HAVE_LOCALTIME_R) 6464 (void) localtime_r(&ts.tv_sec, &tm); 6465 #else 6466 tm = *localtime(&ts.tv_sec); 6467 #endif 6468 6469 #if (NXT_DEBUG) 6470 p += snprintf(p, end - p, 6471 "%4d/%02d/%02d %02d:%02d:%02d.%03d ", 6472 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 6473 tm.tm_hour, tm.tm_min, tm.tm_sec, 6474 (int) ts.tv_nsec / 1000000); 6475 #else 6476 p += snprintf(p, end - p, 6477 "%4d/%02d/%02d %02d:%02d:%02d ", 6478 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 6479 tm.tm_hour, tm.tm_min, tm.tm_sec); 6480 #endif 6481 6482 p += snprintf(p, end - p, 6483 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], 6484 (int) pid, 6485 (uint64_t) (uintptr_t) nxt_thread_get_tid()); 6486 6487 return p; 6488 } 6489 6490 6491 static void * 6492 nxt_unit_lvlhsh_alloc(void *data, size_t size) 6493 { 6494 int err; 6495 void *p; 6496 6497 err = posix_memalign(&p, size, size); 6498 6499 if (nxt_fast_path(err == 0)) { 6500 nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p", 6501 (int) size, (int) size, p); 6502 return p; 6503 } 6504 6505 nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)", 6506 (int) size, (int) size, strerror(err), err); 6507 return NULL; 6508 } 6509 6510 6511 static void 6512 nxt_unit_lvlhsh_free(void *data, void *p) 6513 { 6514 nxt_unit_free(NULL, p); 6515 } 6516 6517 6518 void * 6519 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size) 6520 { 6521 void *p; 6522 6523 p = malloc(size); 6524 6525 if (nxt_fast_path(p != NULL)) { 6526 nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p); 6527 6528 } else { 6529 nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)", 6530 (int) size, strerror(errno), errno); 6531 } 6532 6533 return p; 6534 } 6535 6536 6537 void 6538 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p) 6539 { 6540 nxt_unit_debug(ctx, "free(%p)", p); 6541 6542 free(p); 6543 } 6544 6545 6546 static int 6547 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length) 6548 { 6549 u_char c1, c2; 6550 nxt_int_t n; 6551 const u_char *s1, *s2; 6552 6553 s1 = p1; 6554 s2 = p2; 6555 6556 while (length-- != 0) { 6557 c1 = *s1++; 6558 c2 = *s2++; 6559 6560 c1 = nxt_lowcase(c1); 6561 c2 = nxt_lowcase(c2); 6562 6563 n = c1 - c2; 6564 6565 if (n != 0) { 6566 return n; 6567 } 6568 } 6569 6570 return 0; 6571 } 6572