1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <stdlib.h> 7 8 #include "nxt_main.h" 9 #include "nxt_port_memory_int.h" 10 #include "nxt_port_queue.h" 11 #include "nxt_app_queue.h" 12 13 #include "nxt_unit.h" 14 #include "nxt_unit_request.h" 15 #include "nxt_unit_response.h" 16 #include "nxt_unit_websocket.h" 17 18 #include "nxt_websocket.h" 19 20 #if (NXT_HAVE_MEMFD_CREATE) 21 #include <linux/memfd.h> 22 #endif 23 24 #define NXT_UNIT_MAX_PLAIN_SIZE 1024 25 #define NXT_UNIT_LOCAL_BUF_SIZE \ 26 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) 27 28 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 29 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 30 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 31 typedef struct nxt_unit_process_s nxt_unit_process_t; 32 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 33 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 34 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; 35 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 36 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 37 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 38 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 39 40 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 41 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 42 nxt_unit_ctx_impl_t *ctx_impl, void *data); 43 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx); 44 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx); 45 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); 46 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); 47 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 48 nxt_unit_mmap_buf_t *mmap_buf); 49 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 50 nxt_unit_mmap_buf_t *mmap_buf); 51 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 52 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 53 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, 54 int *log_fd, uint32_t *stream, uint32_t *shm_limit); 55 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, 56 int queue_fd); 57 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, 58 nxt_unit_request_info_t **preq); 59 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 60 nxt_unit_recv_msg_t *recv_msg); 61 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx); 62 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 63 nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq); 64 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, 65 nxt_unit_recv_msg_t *recv_msg); 66 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 67 nxt_unit_port_id_t *port_id); 68 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); 69 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 70 nxt_unit_recv_msg_t *recv_msg); 71 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); 72 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 73 nxt_unit_ctx_t *ctx); 74 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 75 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 76 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 77 nxt_unit_ctx_t *ctx); 78 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 79 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 80 nxt_unit_websocket_frame_impl_t *ws); 81 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 82 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 83 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 84 nxt_unit_mmap_buf_t *mmap_buf, int last); 85 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 86 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); 87 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); 88 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( 89 nxt_unit_ctx_impl_t *ctx_impl); 90 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 91 nxt_unit_read_buf_t *rbuf); 92 static nxt_unit_mmap_buf_t *nxt_unit_request_preread( 93 nxt_unit_request_info_t *req, size_t size); 94 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 95 size_t size); 96 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 97 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n); 98 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 99 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); 100 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 101 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 102 nxt_unit_port_t *port, int n); 103 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); 104 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 105 int fd); 106 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 107 nxt_unit_port_t *port, uint32_t size, 108 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 109 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 110 111 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, 112 nxt_unit_ctx_impl_t *ctx_impl); 113 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 114 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); 115 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); 116 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 117 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, 118 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, 119 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); 120 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 121 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); 122 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); 123 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 124 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 125 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 126 127 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid); 128 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, 129 pid_t pid, int remove); 130 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 131 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); 132 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); 133 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); 134 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); 135 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); 136 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); 137 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); 138 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); 139 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, 140 nxt_unit_port_t *port); 141 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); 142 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); 143 144 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 145 nxt_unit_port_t *port, int queue_fd); 146 147 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); 148 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); 149 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, 150 nxt_unit_port_t *port, void *queue); 151 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, 152 nxt_queue_t *awaiting_req); 153 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, 154 nxt_unit_port_id_t *port_id); 155 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, 156 nxt_unit_port_id_t *port_id); 157 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); 158 static void nxt_unit_remove_process(nxt_unit_impl_t *lib, 159 nxt_unit_process_t *process); 160 static void nxt_unit_quit(nxt_unit_ctx_t *ctx); 161 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); 162 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, 163 nxt_unit_port_t *port, const void *buf, size_t buf_size, 164 const void *oob, size_t oob_size); 165 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 166 const void *buf, size_t buf_size, const void *oob, size_t oob_size); 167 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 168 nxt_unit_read_buf_t *rbuf); 169 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, 170 nxt_unit_read_buf_t *src); 171 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 172 nxt_unit_read_buf_t *rbuf); 173 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 174 nxt_unit_read_buf_t *rbuf); 175 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, 176 nxt_unit_read_buf_t *rbuf); 177 static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, 178 nxt_unit_read_buf_t *rbuf); 179 nxt_inline int nxt_unit_close(int fd); 180 static int nxt_unit_fd_blocking(int fd); 181 182 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 183 nxt_unit_port_t *port); 184 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 185 nxt_unit_port_id_t *port_id, int remove); 186 187 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 188 nxt_unit_request_info_t *req); 189 static nxt_unit_request_info_t *nxt_unit_request_hash_find( 190 nxt_unit_ctx_t *ctx, uint32_t stream, int remove); 191 192 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 193 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size); 194 static void nxt_unit_lvlhsh_free(void *data, void *p); 195 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length); 196 197 198 struct nxt_unit_mmap_buf_s { 199 nxt_unit_buf_t buf; 200 201 nxt_unit_mmap_buf_t *next; 202 nxt_unit_mmap_buf_t **prev; 203 204 nxt_port_mmap_header_t *hdr; 205 nxt_unit_request_info_t *req; 206 nxt_unit_ctx_impl_t *ctx_impl; 207 char *free_ptr; 208 char *plain_ptr; 209 }; 210 211 212 struct nxt_unit_recv_msg_s { 213 uint32_t stream; 214 nxt_pid_t pid; 215 nxt_port_id_t reply_port; 216 217 uint8_t last; /* 1 bit */ 218 uint8_t mmap; /* 1 bit */ 219 220 void *start; 221 uint32_t size; 222 223 int fd[2]; 224 225 nxt_unit_mmap_buf_t *incoming_buf; 226 }; 227 228 229 typedef enum { 230 NXT_UNIT_RS_START = 0, 231 NXT_UNIT_RS_RESPONSE_INIT, 232 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 233 NXT_UNIT_RS_RESPONSE_SENT, 234 NXT_UNIT_RS_RELEASED, 235 } nxt_unit_req_state_t; 236 237 238 struct nxt_unit_request_info_impl_s { 239 nxt_unit_request_info_t req; 240 241 uint32_t stream; 242 243 nxt_unit_mmap_buf_t *outgoing_buf; 244 nxt_unit_mmap_buf_t *incoming_buf; 245 246 nxt_unit_req_state_t state; 247 uint8_t websocket; 248 uint8_t in_hash; 249 250 /* for nxt_unit_ctx_impl_t.free_req or active_req */ 251 nxt_queue_link_t link; 252 /* for nxt_unit_port_impl_t.awaiting_req */ 253 nxt_queue_link_t port_wait_link; 254 255 char extra_data[]; 256 }; 257 258 259 struct nxt_unit_websocket_frame_impl_s { 260 nxt_unit_websocket_frame_t ws; 261 262 nxt_unit_mmap_buf_t *buf; 263 264 nxt_queue_link_t link; 265 266 nxt_unit_ctx_impl_t *ctx_impl; 267 }; 268 269 270 struct nxt_unit_read_buf_s { 271 nxt_queue_link_t link; 272 nxt_unit_ctx_impl_t *ctx_impl; 273 ssize_t size; 274 char buf[16384]; 275 char oob[256]; 276 }; 277 278 279 struct nxt_unit_ctx_impl_s { 280 nxt_unit_ctx_t ctx; 281 282 nxt_atomic_t use_count; 283 nxt_atomic_t wait_items; 284 285 pthread_mutex_t mutex; 286 287 nxt_unit_port_t *read_port; 288 289 nxt_queue_link_t link; 290 291 nxt_unit_mmap_buf_t *free_buf; 292 293 /* of nxt_unit_request_info_impl_t */ 294 nxt_queue_t free_req; 295 296 /* of nxt_unit_websocket_frame_impl_t */ 297 nxt_queue_t free_ws; 298 299 /* of nxt_unit_request_info_impl_t */ 300 nxt_queue_t active_req; 301 302 /* of nxt_unit_request_info_impl_t */ 303 nxt_lvlhsh_t requests; 304 305 /* of nxt_unit_request_info_impl_t */ 306 nxt_queue_t ready_req; 307 308 /* of nxt_unit_read_buf_t */ 309 nxt_queue_t pending_rbuf; 310 311 /* of nxt_unit_read_buf_t */ 312 nxt_queue_t free_rbuf; 313 314 int online; 315 int ready; 316 317 nxt_unit_mmap_buf_t ctx_buf[2]; 318 nxt_unit_read_buf_t ctx_read_buf; 319 320 nxt_unit_request_info_impl_t req; 321 }; 322 323 324 struct nxt_unit_mmap_s { 325 nxt_port_mmap_header_t *hdr; 326 pthread_t src_thread; 327 328 /* of nxt_unit_read_buf_t */ 329 nxt_queue_t awaiting_rbuf; 330 }; 331 332 333 struct nxt_unit_mmaps_s { 334 pthread_mutex_t mutex; 335 uint32_t size; 336 uint32_t cap; 337 nxt_atomic_t allocated_chunks; 338 nxt_unit_mmap_t *elts; 339 }; 340 341 342 struct nxt_unit_impl_s { 343 nxt_unit_t unit; 344 nxt_unit_callbacks_t callbacks; 345 346 nxt_atomic_t use_count; 347 348 uint32_t request_data_size; 349 uint32_t shm_mmap_limit; 350 351 pthread_mutex_t mutex; 352 353 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 354 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 355 356 nxt_unit_port_t *router_port; 357 nxt_unit_port_t *shared_port; 358 359 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 360 361 nxt_unit_mmaps_t incoming; 362 nxt_unit_mmaps_t outgoing; 363 364 pid_t pid; 365 int log_fd; 366 367 nxt_unit_ctx_impl_t main_ctx; 368 }; 369 370 371 struct nxt_unit_port_impl_s { 372 nxt_unit_port_t port; 373 374 nxt_atomic_t use_count; 375 376 /* for nxt_unit_process_t.ports */ 377 nxt_queue_link_t link; 378 nxt_unit_process_t *process; 379 380 /* of nxt_unit_request_info_impl_t */ 381 nxt_queue_t awaiting_req; 382 383 int ready; 384 385 void *queue; 386 387 int from_socket; 388 nxt_unit_read_buf_t *socket_rbuf; 389 }; 390 391 392 struct nxt_unit_process_s { 393 pid_t pid; 394 395 nxt_queue_t ports; /* of nxt_unit_port_impl_t */ 396 397 nxt_unit_impl_t *lib; 398 399 nxt_atomic_t use_count; 400 401 uint32_t next_port_id; 402 }; 403 404 405 /* Explicitly using 32 bit types to avoid possible alignment. */ 406 typedef struct { 407 int32_t pid; 408 uint32_t id; 409 } nxt_unit_port_hash_id_t; 410 411 412 nxt_unit_ctx_t * 413 nxt_unit_init(nxt_unit_init_t *init) 414 { 415 int rc, queue_fd; 416 void *mem; 417 uint32_t ready_stream, shm_limit; 418 nxt_unit_ctx_t *ctx; 419 nxt_unit_impl_t *lib; 420 nxt_unit_port_t ready_port, router_port, read_port; 421 422 lib = nxt_unit_create(init); 423 if (nxt_slow_path(lib == NULL)) { 424 return NULL; 425 } 426 427 queue_fd = -1; 428 mem = MAP_FAILED; 429 430 if (init->ready_port.id.pid != 0 431 && init->ready_stream != 0 432 && init->read_port.id.pid != 0) 433 { 434 ready_port = init->ready_port; 435 ready_stream = init->ready_stream; 436 router_port = init->router_port; 437 read_port = init->read_port; 438 lib->log_fd = init->log_fd; 439 440 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 441 ready_port.id.id); 442 nxt_unit_port_id_init(&router_port.id, router_port.id.pid, 443 router_port.id.id); 444 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 445 read_port.id.id); 446 447 } else { 448 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, 449 &lib->log_fd, &ready_stream, &shm_limit); 450 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 451 goto fail; 452 } 453 454 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) 455 / PORT_MMAP_DATA_SIZE; 456 } 457 458 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { 459 lib->shm_mmap_limit = 1; 460 } 461 462 lib->pid = read_port.id.pid; 463 464 ctx = &lib->main_ctx.ctx; 465 466 rc = nxt_unit_fd_blocking(router_port.out_fd); 467 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 468 goto fail; 469 } 470 471 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); 472 if (nxt_slow_path(lib->router_port == NULL)) { 473 nxt_unit_alert(NULL, "failed to add router_port"); 474 475 goto fail; 476 } 477 478 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); 479 if (nxt_slow_path(queue_fd == -1)) { 480 goto fail; 481 } 482 483 mem = mmap(NULL, sizeof(nxt_port_queue_t), 484 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 485 if (nxt_slow_path(mem == MAP_FAILED)) { 486 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 487 strerror(errno), errno); 488 489 goto fail; 490 } 491 492 nxt_port_queue_init(mem); 493 494 rc = nxt_unit_fd_blocking(read_port.in_fd); 495 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 496 goto fail; 497 } 498 499 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); 500 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { 501 nxt_unit_alert(NULL, "failed to add read_port"); 502 503 goto fail; 504 } 505 506 rc = nxt_unit_fd_blocking(ready_port.out_fd); 507 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 508 goto fail; 509 } 510 511 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); 512 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 513 nxt_unit_alert(NULL, "failed to send READY message"); 514 515 goto fail; 516 } 517 518 nxt_unit_close(ready_port.out_fd); 519 nxt_unit_close(queue_fd); 520 521 return ctx; 522 523 fail: 524 525 if (mem != MAP_FAILED) { 526 munmap(mem, sizeof(nxt_port_queue_t)); 527 } 528 529 if (queue_fd != -1) { 530 nxt_unit_close(queue_fd); 531 } 532 533 nxt_unit_ctx_release(&lib->main_ctx.ctx); 534 535 return NULL; 536 } 537 538 539 static nxt_unit_impl_t * 540 nxt_unit_create(nxt_unit_init_t *init) 541 { 542 int rc; 543 nxt_unit_impl_t *lib; 544 nxt_unit_callbacks_t *cb; 545 546 lib = nxt_unit_malloc(NULL, 547 sizeof(nxt_unit_impl_t) + init->request_data_size); 548 if (nxt_slow_path(lib == NULL)) { 549 nxt_unit_alert(NULL, "failed to allocate unit struct"); 550 551 return NULL; 552 } 553 554 rc = pthread_mutex_init(&lib->mutex, NULL); 555 if (nxt_slow_path(rc != 0)) { 556 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 557 558 goto fail; 559 } 560 561 lib->unit.data = init->data; 562 lib->callbacks = init->callbacks; 563 564 lib->request_data_size = init->request_data_size; 565 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) 566 / PORT_MMAP_DATA_SIZE; 567 568 lib->processes.slot = NULL; 569 lib->ports.slot = NULL; 570 571 lib->log_fd = STDERR_FILENO; 572 573 nxt_queue_init(&lib->contexts); 574 575 lib->use_count = 0; 576 lib->router_port = NULL; 577 lib->shared_port = NULL; 578 579 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 580 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 581 pthread_mutex_destroy(&lib->mutex); 582 goto fail; 583 } 584 585 cb = &lib->callbacks; 586 587 if (cb->request_handler == NULL) { 588 nxt_unit_alert(NULL, "request_handler is NULL"); 589 590 pthread_mutex_destroy(&lib->mutex); 591 goto fail; 592 } 593 594 nxt_unit_mmaps_init(&lib->incoming); 595 nxt_unit_mmaps_init(&lib->outgoing); 596 597 return lib; 598 599 fail: 600 601 nxt_unit_free(NULL, lib); 602 603 return NULL; 604 } 605 606 607 static int 608 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 609 void *data) 610 { 611 int rc; 612 613 ctx_impl->ctx.data = data; 614 ctx_impl->ctx.unit = &lib->unit; 615 616 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 617 if (nxt_slow_path(rc != 0)) { 618 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 619 620 return NXT_UNIT_ERROR; 621 } 622 623 nxt_unit_lib_use(lib); 624 625 pthread_mutex_lock(&lib->mutex); 626 627 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 628 629 pthread_mutex_unlock(&lib->mutex); 630 631 ctx_impl->use_count = 1; 632 ctx_impl->wait_items = 0; 633 ctx_impl->online = 1; 634 ctx_impl->ready = 0; 635 636 nxt_queue_init(&ctx_impl->free_req); 637 nxt_queue_init(&ctx_impl->free_ws); 638 nxt_queue_init(&ctx_impl->active_req); 639 nxt_queue_init(&ctx_impl->ready_req); 640 nxt_queue_init(&ctx_impl->pending_rbuf); 641 nxt_queue_init(&ctx_impl->free_rbuf); 642 643 ctx_impl->free_buf = NULL; 644 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 645 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 646 647 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 648 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); 649 650 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; 651 652 ctx_impl->req.req.ctx = &ctx_impl->ctx; 653 ctx_impl->req.req.unit = &lib->unit; 654 655 ctx_impl->read_port = NULL; 656 ctx_impl->requests.slot = 0; 657 658 return NXT_UNIT_OK; 659 } 660 661 662 nxt_inline void 663 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) 664 { 665 nxt_unit_ctx_impl_t *ctx_impl; 666 667 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 668 669 nxt_atomic_fetch_add(&ctx_impl->use_count, 1); 670 } 671 672 673 nxt_inline void 674 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) 675 { 676 long c; 677 nxt_unit_ctx_impl_t *ctx_impl; 678 679 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 680 681 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); 682 683 if (c == 1) { 684 nxt_unit_ctx_free(ctx_impl); 685 } 686 } 687 688 689 nxt_inline void 690 nxt_unit_lib_use(nxt_unit_impl_t *lib) 691 { 692 nxt_atomic_fetch_add(&lib->use_count, 1); 693 } 694 695 696 nxt_inline void 697 nxt_unit_lib_release(nxt_unit_impl_t *lib) 698 { 699 long c; 700 nxt_unit_process_t *process; 701 702 c = nxt_atomic_fetch_add(&lib->use_count, -1); 703 704 if (c == 1) { 705 for ( ;; ) { 706 pthread_mutex_lock(&lib->mutex); 707 708 process = nxt_unit_process_pop_first(lib); 709 if (process == NULL) { 710 pthread_mutex_unlock(&lib->mutex); 711 712 break; 713 } 714 715 nxt_unit_remove_process(lib, process); 716 } 717 718 pthread_mutex_destroy(&lib->mutex); 719 720 if (nxt_fast_path(lib->router_port != NULL)) { 721 nxt_unit_port_release(lib->router_port); 722 } 723 724 if (nxt_fast_path(lib->shared_port != NULL)) { 725 nxt_unit_port_release(lib->shared_port); 726 } 727 728 nxt_unit_mmaps_destroy(&lib->incoming); 729 nxt_unit_mmaps_destroy(&lib->outgoing); 730 731 nxt_unit_free(NULL, lib); 732 } 733 } 734 735 736 nxt_inline void 737 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 738 nxt_unit_mmap_buf_t *mmap_buf) 739 { 740 mmap_buf->next = *head; 741 742 if (mmap_buf->next != NULL) { 743 mmap_buf->next->prev = &mmap_buf->next; 744 } 745 746 *head = mmap_buf; 747 mmap_buf->prev = head; 748 } 749 750 751 nxt_inline void 752 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 753 nxt_unit_mmap_buf_t *mmap_buf) 754 { 755 while (*prev != NULL) { 756 prev = &(*prev)->next; 757 } 758 759 nxt_unit_mmap_buf_insert(prev, mmap_buf); 760 } 761 762 763 nxt_inline void 764 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) 765 { 766 nxt_unit_mmap_buf_t **prev; 767 768 prev = mmap_buf->prev; 769 770 if (mmap_buf->next != NULL) { 771 mmap_buf->next->prev = prev; 772 } 773 774 if (prev != NULL) { 775 *prev = mmap_buf->next; 776 } 777 } 778 779 780 static int 781 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, 782 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, 783 uint32_t *shm_limit) 784 { 785 int rc; 786 int ready_fd, router_fd, read_in_fd, read_out_fd; 787 char *unit_init, *version_end; 788 long version_length; 789 int64_t ready_pid, router_pid, read_pid; 790 uint32_t ready_stream, router_id, ready_id, read_id; 791 792 unit_init = getenv(NXT_UNIT_INIT_ENV); 793 if (nxt_slow_path(unit_init == NULL)) { 794 nxt_unit_alert(NULL, "%s is not in the current environment", 795 NXT_UNIT_INIT_ENV); 796 797 return NXT_UNIT_ERROR; 798 } 799 800 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 801 802 version_length = nxt_length(NXT_VERSION); 803 804 version_end = strchr(unit_init, ';'); 805 if (version_end == NULL 806 || version_end - unit_init != version_length 807 || memcmp(unit_init, NXT_VERSION, version_length) != 0) 808 { 809 nxt_unit_alert(NULL, "version check error"); 810 811 return NXT_UNIT_ERROR; 812 } 813 814 rc = sscanf(version_end + 1, 815 "%"PRIu32";" 816 "%"PRId64",%"PRIu32",%d;" 817 "%"PRId64",%"PRIu32",%d;" 818 "%"PRId64",%"PRIu32",%d,%d;" 819 "%d,%"PRIu32, 820 &ready_stream, 821 &ready_pid, &ready_id, &ready_fd, 822 &router_pid, &router_id, &router_fd, 823 &read_pid, &read_id, &read_in_fd, &read_out_fd, 824 log_fd, shm_limit); 825 826 if (nxt_slow_path(rc != 13)) { 827 nxt_unit_alert(NULL, "failed to scan variables: %d", rc); 828 829 return NXT_UNIT_ERROR; 830 } 831 832 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 833 834 ready_port->in_fd = -1; 835 ready_port->out_fd = ready_fd; 836 ready_port->data = NULL; 837 838 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); 839 840 router_port->in_fd = -1; 841 router_port->out_fd = router_fd; 842 router_port->data = NULL; 843 844 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 845 846 read_port->in_fd = read_in_fd; 847 read_port->out_fd = read_out_fd; 848 read_port->data = NULL; 849 850 *stream = ready_stream; 851 852 return NXT_UNIT_OK; 853 } 854 855 856 static int 857 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) 858 { 859 ssize_t res; 860 nxt_port_msg_t msg; 861 nxt_unit_impl_t *lib; 862 863 union { 864 struct cmsghdr cm; 865 char space[CMSG_SPACE(sizeof(int))]; 866 } cmsg; 867 868 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 869 870 msg.stream = stream; 871 msg.pid = lib->pid; 872 msg.reply_port = 0; 873 msg.type = _NXT_PORT_MSG_PROCESS_READY; 874 msg.last = 1; 875 msg.mmap = 0; 876 msg.nf = 0; 877 msg.mf = 0; 878 msg.tracking = 0; 879 880 memset(&cmsg, 0, sizeof(cmsg)); 881 882 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 883 cmsg.cm.cmsg_level = SOL_SOCKET; 884 cmsg.cm.cmsg_type = SCM_RIGHTS; 885 886 /* 887 * memcpy() is used instead of simple 888 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 889 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 890 * dereferencing type-punned pointer will break strict-aliasing rules 891 * 892 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 893 * in the same simple assignment as in the code above. 894 */ 895 memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); 896 897 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), 898 &cmsg, sizeof(cmsg)); 899 if (res != sizeof(msg)) { 900 return NXT_UNIT_ERROR; 901 } 902 903 return NXT_UNIT_OK; 904 } 905 906 907 static int 908 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, 909 nxt_unit_request_info_t **preq) 910 { 911 int rc; 912 pid_t pid; 913 struct cmsghdr *cm; 914 nxt_port_msg_t *port_msg; 915 nxt_unit_impl_t *lib; 916 nxt_unit_recv_msg_t recv_msg; 917 918 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 919 920 recv_msg.fd[0] = -1; 921 recv_msg.fd[1] = -1; 922 port_msg = (nxt_port_msg_t *) rbuf->buf; 923 cm = (struct cmsghdr *) rbuf->oob; 924 925 if (cm->cmsg_level == SOL_SOCKET 926 && cm->cmsg_type == SCM_RIGHTS) 927 { 928 if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { 929 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 930 } 931 932 if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { 933 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); 934 } 935 } 936 937 recv_msg.incoming_buf = NULL; 938 939 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 940 if (nxt_slow_path(rbuf->size == 0)) { 941 nxt_unit_debug(ctx, "read port closed"); 942 943 nxt_unit_quit(ctx); 944 rc = NXT_UNIT_OK; 945 goto done; 946 } 947 948 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); 949 950 rc = NXT_UNIT_ERROR; 951 goto done; 952 } 953 954 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", 955 port_msg->stream, (int) port_msg->type, 956 recv_msg.fd[0], recv_msg.fd[1]); 957 958 recv_msg.stream = port_msg->stream; 959 recv_msg.pid = port_msg->pid; 960 recv_msg.reply_port = port_msg->reply_port; 961 recv_msg.last = port_msg->last; 962 recv_msg.mmap = port_msg->mmap; 963 964 recv_msg.start = port_msg + 1; 965 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); 966 967 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 968 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)", 969 port_msg->stream, (int) port_msg->type); 970 rc = NXT_UNIT_ERROR; 971 goto done; 972 } 973 974 /* Fragmentation is unsupported. */ 975 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 976 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)", 977 port_msg->stream, (int) port_msg->type); 978 rc = NXT_UNIT_ERROR; 979 goto done; 980 } 981 982 if (port_msg->mmap) { 983 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); 984 985 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 986 if (rc == NXT_UNIT_AGAIN) { 987 recv_msg.fd[0] = -1; 988 recv_msg.fd[1] = -1; 989 } 990 991 goto done; 992 } 993 } 994 995 switch (port_msg->type) { 996 997 case _NXT_PORT_MSG_RPC_READY: 998 rc = NXT_UNIT_OK; 999 break; 1000 1001 case _NXT_PORT_MSG_QUIT: 1002 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 1003 1004 nxt_unit_quit(ctx); 1005 rc = NXT_UNIT_OK; 1006 break; 1007 1008 case _NXT_PORT_MSG_NEW_PORT: 1009 rc = nxt_unit_process_new_port(ctx, &recv_msg); 1010 break; 1011 1012 case _NXT_PORT_MSG_PORT_ACK: 1013 rc = nxt_unit_ctx_ready(ctx); 1014 break; 1015 1016 case _NXT_PORT_MSG_CHANGE_FILE: 1017 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 1018 port_msg->stream, recv_msg.fd[0]); 1019 1020 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { 1021 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", 1022 port_msg->stream, recv_msg.fd[0], lib->log_fd, 1023 strerror(errno), errno); 1024 1025 rc = NXT_UNIT_ERROR; 1026 goto done; 1027 } 1028 1029 rc = NXT_UNIT_OK; 1030 break; 1031 1032 case _NXT_PORT_MSG_MMAP: 1033 if (nxt_slow_path(recv_msg.fd[0] < 0)) { 1034 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 1035 port_msg->stream, recv_msg.fd[0]); 1036 1037 rc = NXT_UNIT_ERROR; 1038 goto done; 1039 } 1040 1041 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); 1042 break; 1043 1044 case _NXT_PORT_MSG_REQ_HEADERS: 1045 rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq); 1046 break; 1047 1048 case _NXT_PORT_MSG_REQ_BODY: 1049 rc = nxt_unit_process_req_body(ctx, &recv_msg); 1050 break; 1051 1052 case _NXT_PORT_MSG_WEBSOCKET: 1053 rc = nxt_unit_process_websocket(ctx, &recv_msg); 1054 break; 1055 1056 case _NXT_PORT_MSG_REMOVE_PID: 1057 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 1058 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " 1059 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 1060 (int) sizeof(pid)); 1061 1062 rc = NXT_UNIT_ERROR; 1063 goto done; 1064 } 1065 1066 memcpy(&pid, recv_msg.start, sizeof(pid)); 1067 1068 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 1069 port_msg->stream, (int) pid); 1070 1071 nxt_unit_remove_pid(lib, pid); 1072 1073 rc = NXT_UNIT_OK; 1074 break; 1075 1076 case _NXT_PORT_MSG_SHM_ACK: 1077 rc = nxt_unit_process_shm_ack(ctx); 1078 break; 1079 1080 default: 1081 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d", 1082 port_msg->stream, (int) port_msg->type); 1083 1084 rc = NXT_UNIT_ERROR; 1085 goto done; 1086 } 1087 1088 done: 1089 1090 if (recv_msg.fd[0] != -1) { 1091 nxt_unit_close(recv_msg.fd[0]); 1092 } 1093 1094 if (recv_msg.fd[1] != -1) { 1095 nxt_unit_close(recv_msg.fd[1]); 1096 } 1097 1098 while (recv_msg.incoming_buf != NULL) { 1099 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 1100 } 1101 1102 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { 1103 #if (NXT_DEBUG) 1104 memset(rbuf->buf, 0xAC, rbuf->size); 1105 #endif 1106 nxt_unit_read_buf_release(ctx, rbuf); 1107 } 1108 1109 return rc; 1110 } 1111 1112 1113 static int 1114 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1115 { 1116 void *mem; 1117 nxt_unit_impl_t *lib; 1118 nxt_unit_port_t new_port, *port; 1119 nxt_port_msg_new_port_t *new_port_msg; 1120 1121 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 1122 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 1123 "invalid message size (%d)", 1124 recv_msg->stream, (int) recv_msg->size); 1125 1126 return NXT_UNIT_ERROR; 1127 } 1128 1129 if (nxt_slow_path(recv_msg->fd[0] < 0)) { 1130 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 1131 recv_msg->stream, recv_msg->fd[0]); 1132 1133 return NXT_UNIT_ERROR; 1134 } 1135 1136 new_port_msg = recv_msg->start; 1137 1138 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d", 1139 recv_msg->stream, (int) new_port_msg->pid, 1140 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); 1141 1142 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1143 1144 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { 1145 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); 1146 1147 new_port.in_fd = recv_msg->fd[0]; 1148 new_port.out_fd = -1; 1149 1150 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, 1151 MAP_SHARED, recv_msg->fd[1], 0); 1152 1153 } else { 1154 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) 1155 != NXT_UNIT_OK)) 1156 { 1157 return NXT_UNIT_ERROR; 1158 } 1159 1160 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 1161 new_port_msg->id); 1162 1163 new_port.in_fd = -1; 1164 new_port.out_fd = recv_msg->fd[0]; 1165 1166 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, 1167 MAP_SHARED, recv_msg->fd[1], 0); 1168 } 1169 1170 if (nxt_slow_path(mem == MAP_FAILED)) { 1171 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], 1172 strerror(errno), errno); 1173 1174 return NXT_UNIT_ERROR; 1175 } 1176 1177 new_port.data = NULL; 1178 1179 recv_msg->fd[0] = -1; 1180 1181 port = nxt_unit_add_port(ctx, &new_port, mem); 1182 if (nxt_slow_path(port == NULL)) { 1183 return NXT_UNIT_ERROR; 1184 } 1185 1186 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { 1187 lib->shared_port = port; 1188 1189 return nxt_unit_ctx_ready(ctx); 1190 } 1191 1192 nxt_unit_port_release(port); 1193 1194 return NXT_UNIT_OK; 1195 } 1196 1197 1198 static int 1199 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) 1200 { 1201 nxt_unit_impl_t *lib; 1202 nxt_unit_ctx_impl_t *ctx_impl; 1203 1204 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1205 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1206 1207 ctx_impl->ready = 1; 1208 1209 if (lib->callbacks.ready_handler) { 1210 return lib->callbacks.ready_handler(ctx); 1211 } 1212 1213 return NXT_UNIT_OK; 1214 } 1215 1216 1217 static int 1218 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 1219 nxt_unit_request_info_t **preq) 1220 { 1221 int res; 1222 nxt_unit_impl_t *lib; 1223 nxt_unit_port_id_t port_id; 1224 nxt_unit_request_t *r; 1225 nxt_unit_mmap_buf_t *b; 1226 nxt_unit_request_info_t *req; 1227 nxt_unit_request_info_impl_t *req_impl; 1228 1229 if (nxt_slow_path(recv_msg->mmap == 0)) { 1230 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 1231 recv_msg->stream); 1232 1233 return NXT_UNIT_ERROR; 1234 } 1235 1236 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 1237 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 1238 "%d expected", recv_msg->stream, (int) recv_msg->size, 1239 (int) sizeof(nxt_unit_request_t)); 1240 1241 return NXT_UNIT_ERROR; 1242 } 1243 1244 req_impl = nxt_unit_request_info_get(ctx); 1245 if (nxt_slow_path(req_impl == NULL)) { 1246 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 1247 recv_msg->stream); 1248 1249 return NXT_UNIT_ERROR; 1250 } 1251 1252 req = &req_impl->req; 1253 1254 req->request = recv_msg->start; 1255 1256 b = recv_msg->incoming_buf; 1257 1258 req->request_buf = &b->buf; 1259 req->response = NULL; 1260 req->response_buf = NULL; 1261 1262 r = req->request; 1263 1264 req->content_length = r->content_length; 1265 1266 req->content_buf = req->request_buf; 1267 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 1268 1269 req_impl->stream = recv_msg->stream; 1270 1271 req_impl->outgoing_buf = NULL; 1272 1273 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1274 b->req = req; 1275 } 1276 1277 /* "Move" incoming buffer list to req_impl. */ 1278 req_impl->incoming_buf = recv_msg->incoming_buf; 1279 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 1280 recv_msg->incoming_buf = NULL; 1281 1282 req->content_fd = recv_msg->fd[0]; 1283 recv_msg->fd[0] = -1; 1284 1285 req->response_max_fields = 0; 1286 req_impl->state = NXT_UNIT_RS_START; 1287 req_impl->websocket = 0; 1288 req_impl->in_hash = 0; 1289 1290 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 1291 (int) r->method_length, 1292 (char *) nxt_unit_sptr_get(&r->method), 1293 (int) r->target_length, 1294 (char *) nxt_unit_sptr_get(&r->target), 1295 (int) r->content_length); 1296 1297 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); 1298 1299 res = nxt_unit_request_check_response_port(req, &port_id); 1300 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1301 return NXT_UNIT_ERROR; 1302 } 1303 1304 if (nxt_fast_path(res == NXT_UNIT_OK)) { 1305 res = nxt_unit_send_req_headers_ack(req); 1306 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1307 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1308 1309 return NXT_UNIT_ERROR; 1310 } 1311 1312 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1313 1314 if (req->content_length 1315 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 1316 { 1317 res = nxt_unit_request_hash_add(ctx, req); 1318 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1319 nxt_unit_req_warn(req, "failed to add request to hash"); 1320 1321 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1322 1323 return NXT_UNIT_ERROR; 1324 } 1325 1326 /* 1327 * If application have separate data handler, we may start 1328 * request processing and process data when it is arrived. 1329 */ 1330 if (lib->callbacks.data_handler == NULL) { 1331 return NXT_UNIT_OK; 1332 } 1333 } 1334 1335 if (preq == NULL) { 1336 lib->callbacks.request_handler(req); 1337 1338 } else { 1339 *preq = req; 1340 } 1341 } 1342 1343 return NXT_UNIT_OK; 1344 } 1345 1346 1347 static int 1348 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1349 { 1350 uint64_t l; 1351 nxt_unit_impl_t *lib; 1352 nxt_unit_mmap_buf_t *b; 1353 nxt_unit_request_info_t *req; 1354 1355 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1356 if (req == NULL) { 1357 return NXT_UNIT_OK; 1358 } 1359 1360 l = req->content_buf->end - req->content_buf->free; 1361 1362 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1363 b->req = req; 1364 l += b->buf.end - b->buf.free; 1365 } 1366 1367 if (recv_msg->incoming_buf != NULL) { 1368 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); 1369 1370 while (b->next != NULL) { 1371 b = b->next; 1372 } 1373 1374 /* "Move" incoming buffer list to req_impl. */ 1375 b->next = recv_msg->incoming_buf; 1376 b->next->prev = &b->next; 1377 1378 recv_msg->incoming_buf = NULL; 1379 } 1380 1381 req->content_fd = recv_msg->fd[0]; 1382 recv_msg->fd[0] = -1; 1383 1384 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1385 1386 if (lib->callbacks.data_handler != NULL) { 1387 lib->callbacks.data_handler(req); 1388 1389 return NXT_UNIT_OK; 1390 } 1391 1392 if (req->content_fd != -1 || l == req->content_length) { 1393 lib->callbacks.request_handler(req); 1394 } 1395 1396 return NXT_UNIT_OK; 1397 } 1398 1399 1400 static int 1401 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 1402 nxt_unit_port_id_t *port_id) 1403 { 1404 int res; 1405 nxt_unit_ctx_t *ctx; 1406 nxt_unit_impl_t *lib; 1407 nxt_unit_port_t *port; 1408 nxt_unit_process_t *process; 1409 nxt_unit_ctx_impl_t *ctx_impl; 1410 nxt_unit_port_impl_t *port_impl; 1411 nxt_unit_request_info_impl_t *req_impl; 1412 1413 ctx = req->ctx; 1414 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1415 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1416 1417 pthread_mutex_lock(&lib->mutex); 1418 1419 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 1420 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 1421 1422 if (nxt_fast_path(port != NULL)) { 1423 req->response_port = port; 1424 1425 if (nxt_fast_path(port_impl->ready)) { 1426 pthread_mutex_unlock(&lib->mutex); 1427 1428 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", 1429 (int) port->id.pid, (int) port->id.id); 1430 1431 return NXT_UNIT_OK; 1432 } 1433 1434 nxt_unit_debug(ctx, "check_response_port: " 1435 "port{%d,%d} already requested", 1436 (int) port->id.pid, (int) port->id.id); 1437 1438 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1439 1440 nxt_queue_insert_tail(&port_impl->awaiting_req, 1441 &req_impl->port_wait_link); 1442 1443 pthread_mutex_unlock(&lib->mutex); 1444 1445 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1446 1447 return NXT_UNIT_AGAIN; 1448 } 1449 1450 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 1451 if (nxt_slow_path(port_impl == NULL)) { 1452 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", 1453 (int) sizeof(nxt_unit_port_impl_t)); 1454 1455 pthread_mutex_unlock(&lib->mutex); 1456 1457 return NXT_UNIT_ERROR; 1458 } 1459 1460 port = &port_impl->port; 1461 1462 port->id = *port_id; 1463 port->in_fd = -1; 1464 port->out_fd = -1; 1465 port->data = NULL; 1466 1467 res = nxt_unit_port_hash_add(&lib->ports, port); 1468 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1469 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", 1470 port->id.pid, port->id.id); 1471 1472 pthread_mutex_unlock(&lib->mutex); 1473 1474 nxt_unit_free(ctx, port); 1475 1476 return NXT_UNIT_ERROR; 1477 } 1478 1479 process = nxt_unit_process_find(lib, port_id->pid, 0); 1480 if (nxt_slow_path(process == NULL)) { 1481 nxt_unit_alert(ctx, "check_response_port: process %d not found", 1482 port->id.pid); 1483 1484 nxt_unit_port_hash_find(&lib->ports, port_id, 1); 1485 1486 pthread_mutex_unlock(&lib->mutex); 1487 1488 nxt_unit_free(ctx, port); 1489 1490 return NXT_UNIT_ERROR; 1491 } 1492 1493 nxt_queue_insert_tail(&process->ports, &port_impl->link); 1494 1495 port_impl->process = process; 1496 port_impl->queue = NULL; 1497 port_impl->from_socket = 0; 1498 port_impl->socket_rbuf = NULL; 1499 1500 nxt_queue_init(&port_impl->awaiting_req); 1501 1502 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1503 1504 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); 1505 1506 port_impl->use_count = 2; 1507 port_impl->ready = 0; 1508 1509 req->response_port = port; 1510 1511 pthread_mutex_unlock(&lib->mutex); 1512 1513 res = nxt_unit_get_port(ctx, port_id); 1514 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1515 return NXT_UNIT_ERROR; 1516 } 1517 1518 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1519 1520 return NXT_UNIT_AGAIN; 1521 } 1522 1523 1524 static int 1525 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) 1526 { 1527 ssize_t res; 1528 nxt_port_msg_t msg; 1529 nxt_unit_impl_t *lib; 1530 nxt_unit_ctx_impl_t *ctx_impl; 1531 nxt_unit_request_info_impl_t *req_impl; 1532 1533 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 1534 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1535 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1536 1537 memset(&msg, 0, sizeof(nxt_port_msg_t)); 1538 1539 msg.stream = req_impl->stream; 1540 msg.pid = lib->pid; 1541 msg.reply_port = ctx_impl->read_port->id.id; 1542 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; 1543 1544 res = nxt_unit_port_send(req->ctx, req->response_port, 1545 &msg, sizeof(msg), NULL, 0); 1546 if (nxt_slow_path(res != sizeof(msg))) { 1547 return NXT_UNIT_ERROR; 1548 } 1549 1550 return NXT_UNIT_OK; 1551 } 1552 1553 1554 static int 1555 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1556 { 1557 size_t hsize; 1558 nxt_unit_impl_t *lib; 1559 nxt_unit_mmap_buf_t *b; 1560 nxt_unit_callbacks_t *cb; 1561 nxt_unit_request_info_t *req; 1562 nxt_unit_request_info_impl_t *req_impl; 1563 nxt_unit_websocket_frame_impl_t *ws_impl; 1564 1565 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1566 if (nxt_slow_path(req == NULL)) { 1567 return NXT_UNIT_OK; 1568 } 1569 1570 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1571 1572 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1573 cb = &lib->callbacks; 1574 1575 if (cb->websocket_handler && recv_msg->size >= 2) { 1576 ws_impl = nxt_unit_websocket_frame_get(ctx); 1577 if (nxt_slow_path(ws_impl == NULL)) { 1578 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 1579 req_impl->stream); 1580 1581 return NXT_UNIT_ERROR; 1582 } 1583 1584 ws_impl->ws.req = req; 1585 1586 ws_impl->buf = NULL; 1587 1588 if (recv_msg->mmap) { 1589 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1590 b->req = req; 1591 } 1592 1593 /* "Move" incoming buffer list to ws_impl. */ 1594 ws_impl->buf = recv_msg->incoming_buf; 1595 ws_impl->buf->prev = &ws_impl->buf; 1596 recv_msg->incoming_buf = NULL; 1597 1598 b = ws_impl->buf; 1599 1600 } else { 1601 b = nxt_unit_mmap_buf_get(ctx); 1602 if (nxt_slow_path(b == NULL)) { 1603 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 1604 req_impl->stream); 1605 1606 nxt_unit_websocket_frame_release(&ws_impl->ws); 1607 1608 return NXT_UNIT_ERROR; 1609 } 1610 1611 b->req = req; 1612 b->buf.start = recv_msg->start; 1613 b->buf.free = b->buf.start; 1614 b->buf.end = b->buf.start + recv_msg->size; 1615 1616 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 1617 } 1618 1619 ws_impl->ws.header = (void *) b->buf.start; 1620 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1621 ws_impl->ws.header); 1622 1623 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1624 1625 if (ws_impl->ws.header->mask) { 1626 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1627 1628 } else { 1629 ws_impl->ws.mask = NULL; 1630 } 1631 1632 b->buf.free += hsize; 1633 1634 ws_impl->ws.content_buf = &b->buf; 1635 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1636 1637 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1638 "payload_len=%"PRIu64, 1639 ws_impl->ws.header->opcode, 1640 ws_impl->ws.payload_len); 1641 1642 cb->websocket_handler(&ws_impl->ws); 1643 } 1644 1645 if (recv_msg->last) { 1646 req_impl->websocket = 0; 1647 1648 if (cb->close_handler) { 1649 nxt_unit_req_debug(req, "close_handler"); 1650 1651 cb->close_handler(req); 1652 1653 } else { 1654 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1655 } 1656 } 1657 1658 return NXT_UNIT_OK; 1659 } 1660 1661 1662 static int 1663 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) 1664 { 1665 nxt_unit_impl_t *lib; 1666 nxt_unit_callbacks_t *cb; 1667 1668 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1669 cb = &lib->callbacks; 1670 1671 if (cb->shm_ack_handler != NULL) { 1672 cb->shm_ack_handler(ctx); 1673 } 1674 1675 return NXT_UNIT_OK; 1676 } 1677 1678 1679 static nxt_unit_request_info_impl_t * 1680 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1681 { 1682 nxt_unit_impl_t *lib; 1683 nxt_queue_link_t *lnk; 1684 nxt_unit_ctx_impl_t *ctx_impl; 1685 nxt_unit_request_info_impl_t *req_impl; 1686 1687 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1688 1689 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1690 1691 pthread_mutex_lock(&ctx_impl->mutex); 1692 1693 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1694 pthread_mutex_unlock(&ctx_impl->mutex); 1695 1696 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t) 1697 + lib->request_data_size); 1698 if (nxt_slow_path(req_impl == NULL)) { 1699 return NULL; 1700 } 1701 1702 req_impl->req.unit = ctx->unit; 1703 req_impl->req.ctx = ctx; 1704 1705 pthread_mutex_lock(&ctx_impl->mutex); 1706 1707 } else { 1708 lnk = nxt_queue_first(&ctx_impl->free_req); 1709 nxt_queue_remove(lnk); 1710 1711 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1712 } 1713 1714 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1715 1716 pthread_mutex_unlock(&ctx_impl->mutex); 1717 1718 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1719 1720 return req_impl; 1721 } 1722 1723 1724 static void 1725 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1726 { 1727 nxt_unit_ctx_impl_t *ctx_impl; 1728 nxt_unit_request_info_impl_t *req_impl; 1729 1730 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1731 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1732 1733 req->response = NULL; 1734 req->response_buf = NULL; 1735 1736 if (req_impl->in_hash) { 1737 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); 1738 } 1739 1740 req_impl->websocket = 0; 1741 1742 while (req_impl->outgoing_buf != NULL) { 1743 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1744 } 1745 1746 while (req_impl->incoming_buf != NULL) { 1747 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1748 } 1749 1750 if (req->content_fd != -1) { 1751 nxt_unit_close(req->content_fd); 1752 1753 req->content_fd = -1; 1754 } 1755 1756 if (req->response_port != NULL) { 1757 nxt_unit_port_release(req->response_port); 1758 1759 req->response_port = NULL; 1760 } 1761 1762 req_impl->state = NXT_UNIT_RS_RELEASED; 1763 1764 pthread_mutex_lock(&ctx_impl->mutex); 1765 1766 nxt_queue_remove(&req_impl->link); 1767 1768 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1769 1770 pthread_mutex_unlock(&ctx_impl->mutex); 1771 } 1772 1773 1774 static void 1775 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1776 { 1777 nxt_unit_ctx_impl_t *ctx_impl; 1778 1779 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1780 1781 nxt_queue_remove(&req_impl->link); 1782 1783 if (req_impl != &ctx_impl->req) { 1784 nxt_unit_free(&ctx_impl->ctx, req_impl); 1785 } 1786 } 1787 1788 1789 static nxt_unit_websocket_frame_impl_t * 1790 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1791 { 1792 nxt_queue_link_t *lnk; 1793 nxt_unit_ctx_impl_t *ctx_impl; 1794 nxt_unit_websocket_frame_impl_t *ws_impl; 1795 1796 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1797 1798 pthread_mutex_lock(&ctx_impl->mutex); 1799 1800 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1801 pthread_mutex_unlock(&ctx_impl->mutex); 1802 1803 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t)); 1804 if (nxt_slow_path(ws_impl == NULL)) { 1805 return NULL; 1806 } 1807 1808 } else { 1809 lnk = nxt_queue_first(&ctx_impl->free_ws); 1810 nxt_queue_remove(lnk); 1811 1812 pthread_mutex_unlock(&ctx_impl->mutex); 1813 1814 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1815 } 1816 1817 ws_impl->ctx_impl = ctx_impl; 1818 1819 return ws_impl; 1820 } 1821 1822 1823 static void 1824 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1825 { 1826 nxt_unit_websocket_frame_impl_t *ws_impl; 1827 1828 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1829 1830 while (ws_impl->buf != NULL) { 1831 nxt_unit_mmap_buf_free(ws_impl->buf); 1832 } 1833 1834 ws->req = NULL; 1835 1836 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1837 1838 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1839 1840 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1841 } 1842 1843 1844 static void 1845 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 1846 nxt_unit_websocket_frame_impl_t *ws_impl) 1847 { 1848 nxt_queue_remove(&ws_impl->link); 1849 1850 nxt_unit_free(ctx, ws_impl); 1851 } 1852 1853 1854 uint16_t 1855 nxt_unit_field_hash(const char *name, size_t name_length) 1856 { 1857 u_char ch; 1858 uint32_t hash; 1859 const char *p, *end; 1860 1861 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1862 end = name + name_length; 1863 1864 for (p = name; p < end; p++) { 1865 ch = *p; 1866 hash = (hash << 4) + hash + nxt_lowcase(ch); 1867 } 1868 1869 hash = (hash >> 16) ^ hash; 1870 1871 return hash; 1872 } 1873 1874 1875 void 1876 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1877 { 1878 char *name; 1879 uint32_t i, j; 1880 nxt_unit_field_t *fields, f; 1881 nxt_unit_request_t *r; 1882 1883 static nxt_str_t content_length = nxt_string("content-length"); 1884 static nxt_str_t content_type = nxt_string("content-type"); 1885 static nxt_str_t cookie = nxt_string("cookie"); 1886 1887 nxt_unit_req_debug(req, "group_dup_fields"); 1888 1889 r = req->request; 1890 fields = r->fields; 1891 1892 for (i = 0; i < r->fields_count; i++) { 1893 name = nxt_unit_sptr_get(&fields[i].name); 1894 1895 switch (fields[i].hash) { 1896 case NXT_UNIT_HASH_CONTENT_LENGTH: 1897 if (fields[i].name_length == content_length.length 1898 && nxt_unit_memcasecmp(name, content_length.start, 1899 content_length.length) == 0) 1900 { 1901 r->content_length_field = i; 1902 } 1903 1904 break; 1905 1906 case NXT_UNIT_HASH_CONTENT_TYPE: 1907 if (fields[i].name_length == content_type.length 1908 && nxt_unit_memcasecmp(name, content_type.start, 1909 content_type.length) == 0) 1910 { 1911 r->content_type_field = i; 1912 } 1913 1914 break; 1915 1916 case NXT_UNIT_HASH_COOKIE: 1917 if (fields[i].name_length == cookie.length 1918 && nxt_unit_memcasecmp(name, cookie.start, 1919 cookie.length) == 0) 1920 { 1921 r->cookie_field = i; 1922 } 1923 1924 break; 1925 } 1926 1927 for (j = i + 1; j < r->fields_count; j++) { 1928 if (fields[i].hash != fields[j].hash 1929 || fields[i].name_length != fields[j].name_length 1930 || nxt_unit_memcasecmp(name, 1931 nxt_unit_sptr_get(&fields[j].name), 1932 fields[j].name_length) != 0) 1933 { 1934 continue; 1935 } 1936 1937 f = fields[j]; 1938 f.value.offset += (j - (i + 1)) * sizeof(f); 1939 1940 while (j > i + 1) { 1941 fields[j] = fields[j - 1]; 1942 fields[j].name.offset -= sizeof(f); 1943 fields[j].value.offset -= sizeof(f); 1944 j--; 1945 } 1946 1947 fields[j] = f; 1948 1949 /* Assign the same name pointer for further grouping simplicity. */ 1950 nxt_unit_sptr_set(&fields[j].name, name); 1951 1952 i++; 1953 } 1954 } 1955 } 1956 1957 1958 int 1959 nxt_unit_response_init(nxt_unit_request_info_t *req, 1960 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 1961 { 1962 uint32_t buf_size; 1963 nxt_unit_buf_t *buf; 1964 nxt_unit_request_info_impl_t *req_impl; 1965 1966 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1967 1968 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1969 nxt_unit_req_warn(req, "init: response already sent"); 1970 1971 return NXT_UNIT_ERROR; 1972 } 1973 1974 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 1975 (int) max_fields_count, (int) max_fields_size); 1976 1977 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 1978 nxt_unit_req_debug(req, "duplicate response init"); 1979 } 1980 1981 /* 1982 * Each field name and value 0-terminated by libunit, 1983 * this is the reason of '+ 2' below. 1984 */ 1985 buf_size = sizeof(nxt_unit_response_t) 1986 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 1987 + max_fields_size; 1988 1989 if (nxt_slow_path(req->response_buf != NULL)) { 1990 buf = req->response_buf; 1991 1992 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 1993 goto init_response; 1994 } 1995 1996 nxt_unit_buf_free(buf); 1997 1998 req->response_buf = NULL; 1999 req->response = NULL; 2000 req->response_max_fields = 0; 2001 2002 req_impl->state = NXT_UNIT_RS_START; 2003 } 2004 2005 buf = nxt_unit_response_buf_alloc(req, buf_size); 2006 if (nxt_slow_path(buf == NULL)) { 2007 return NXT_UNIT_ERROR; 2008 } 2009 2010 init_response: 2011 2012 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 2013 2014 req->response_buf = buf; 2015 2016 req->response = (nxt_unit_response_t *) buf->start; 2017 req->response->status = status; 2018 2019 buf->free = buf->start + sizeof(nxt_unit_response_t) 2020 + max_fields_count * sizeof(nxt_unit_field_t); 2021 2022 req->response_max_fields = max_fields_count; 2023 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 2024 2025 return NXT_UNIT_OK; 2026 } 2027 2028 2029 int 2030 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 2031 uint32_t max_fields_count, uint32_t max_fields_size) 2032 { 2033 char *p; 2034 uint32_t i, buf_size; 2035 nxt_unit_buf_t *buf; 2036 nxt_unit_field_t *f, *src; 2037 nxt_unit_response_t *resp; 2038 nxt_unit_request_info_impl_t *req_impl; 2039 2040 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2041 2042 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2043 nxt_unit_req_warn(req, "realloc: response not init"); 2044 2045 return NXT_UNIT_ERROR; 2046 } 2047 2048 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2049 nxt_unit_req_warn(req, "realloc: response already sent"); 2050 2051 return NXT_UNIT_ERROR; 2052 } 2053 2054 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 2055 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 2056 2057 return NXT_UNIT_ERROR; 2058 } 2059 2060 /* 2061 * Each field name and value 0-terminated by libunit, 2062 * this is the reason of '+ 2' below. 2063 */ 2064 buf_size = sizeof(nxt_unit_response_t) 2065 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 2066 + max_fields_size; 2067 2068 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 2069 2070 buf = nxt_unit_response_buf_alloc(req, buf_size); 2071 if (nxt_slow_path(buf == NULL)) { 2072 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 2073 return NXT_UNIT_ERROR; 2074 } 2075 2076 resp = (nxt_unit_response_t *) buf->start; 2077 2078 memset(resp, 0, sizeof(nxt_unit_response_t)); 2079 2080 resp->status = req->response->status; 2081 resp->content_length = req->response->content_length; 2082 2083 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 2084 f = resp->fields; 2085 2086 for (i = 0; i < req->response->fields_count; i++) { 2087 src = req->response->fields + i; 2088 2089 if (nxt_slow_path(src->skip != 0)) { 2090 continue; 2091 } 2092 2093 if (nxt_slow_path(src->name_length + src->value_length + 2 2094 > (uint32_t) (buf->end - p))) 2095 { 2096 nxt_unit_req_warn(req, "realloc: not enough space for field" 2097 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 2098 i, src, src->name_length, src->value_length); 2099 2100 goto fail; 2101 } 2102 2103 nxt_unit_sptr_set(&f->name, p); 2104 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 2105 *p++ = '\0'; 2106 2107 nxt_unit_sptr_set(&f->value, p); 2108 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 2109 *p++ = '\0'; 2110 2111 f->hash = src->hash; 2112 f->skip = 0; 2113 f->name_length = src->name_length; 2114 f->value_length = src->value_length; 2115 2116 resp->fields_count++; 2117 f++; 2118 } 2119 2120 if (req->response->piggyback_content_length > 0) { 2121 if (nxt_slow_path(req->response->piggyback_content_length 2122 > (uint32_t) (buf->end - p))) 2123 { 2124 nxt_unit_req_warn(req, "realloc: not enought space for content" 2125 " #%"PRIu32", %"PRIu32" required", 2126 i, req->response->piggyback_content_length); 2127 2128 goto fail; 2129 } 2130 2131 resp->piggyback_content_length = 2132 req->response->piggyback_content_length; 2133 2134 nxt_unit_sptr_set(&resp->piggyback_content, p); 2135 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 2136 req->response->piggyback_content_length); 2137 } 2138 2139 buf->free = p; 2140 2141 nxt_unit_buf_free(req->response_buf); 2142 2143 req->response = resp; 2144 req->response_buf = buf; 2145 req->response_max_fields = max_fields_count; 2146 2147 return NXT_UNIT_OK; 2148 2149 fail: 2150 2151 nxt_unit_buf_free(buf); 2152 2153 return NXT_UNIT_ERROR; 2154 } 2155 2156 2157 int 2158 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 2159 { 2160 nxt_unit_request_info_impl_t *req_impl; 2161 2162 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2163 2164 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 2165 } 2166 2167 2168 int 2169 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 2170 const char *name, uint8_t name_length, 2171 const char *value, uint32_t value_length) 2172 { 2173 nxt_unit_buf_t *buf; 2174 nxt_unit_field_t *f; 2175 nxt_unit_response_t *resp; 2176 nxt_unit_request_info_impl_t *req_impl; 2177 2178 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2179 2180 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 2181 nxt_unit_req_warn(req, "add_field: response not initialized or " 2182 "already sent"); 2183 2184 return NXT_UNIT_ERROR; 2185 } 2186 2187 resp = req->response; 2188 2189 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 2190 nxt_unit_req_warn(req, "add_field: too many response fields (%d)", 2191 (int) resp->fields_count); 2192 2193 return NXT_UNIT_ERROR; 2194 } 2195 2196 buf = req->response_buf; 2197 2198 if (nxt_slow_path(name_length + value_length + 2 2199 > (uint32_t) (buf->end - buf->free))) 2200 { 2201 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 2202 2203 return NXT_UNIT_ERROR; 2204 } 2205 2206 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 2207 resp->fields_count, 2208 (int) name_length, name, 2209 (int) value_length, value); 2210 2211 f = resp->fields + resp->fields_count; 2212 2213 nxt_unit_sptr_set(&f->name, buf->free); 2214 buf->free = nxt_cpymem(buf->free, name, name_length); 2215 *buf->free++ = '\0'; 2216 2217 nxt_unit_sptr_set(&f->value, buf->free); 2218 buf->free = nxt_cpymem(buf->free, value, value_length); 2219 *buf->free++ = '\0'; 2220 2221 f->hash = nxt_unit_field_hash(name, name_length); 2222 f->skip = 0; 2223 f->name_length = name_length; 2224 f->value_length = value_length; 2225 2226 resp->fields_count++; 2227 2228 return NXT_UNIT_OK; 2229 } 2230 2231 2232 int 2233 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 2234 const void* src, uint32_t size) 2235 { 2236 nxt_unit_buf_t *buf; 2237 nxt_unit_response_t *resp; 2238 nxt_unit_request_info_impl_t *req_impl; 2239 2240 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2241 2242 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2243 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 2244 2245 return NXT_UNIT_ERROR; 2246 } 2247 2248 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2249 nxt_unit_req_warn(req, "add_content: response already sent"); 2250 2251 return NXT_UNIT_ERROR; 2252 } 2253 2254 buf = req->response_buf; 2255 2256 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 2257 nxt_unit_req_warn(req, "add_content: buffer overflow"); 2258 2259 return NXT_UNIT_ERROR; 2260 } 2261 2262 resp = req->response; 2263 2264 if (resp->piggyback_content_length == 0) { 2265 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 2266 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 2267 } 2268 2269 resp->piggyback_content_length += size; 2270 2271 buf->free = nxt_cpymem(buf->free, src, size); 2272 2273 return NXT_UNIT_OK; 2274 } 2275 2276 2277 int 2278 nxt_unit_response_send(nxt_unit_request_info_t *req) 2279 { 2280 int rc; 2281 nxt_unit_mmap_buf_t *mmap_buf; 2282 nxt_unit_request_info_impl_t *req_impl; 2283 2284 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2285 2286 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2287 nxt_unit_req_warn(req, "send: response is not initialized yet"); 2288 2289 return NXT_UNIT_ERROR; 2290 } 2291 2292 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2293 nxt_unit_req_warn(req, "send: response already sent"); 2294 2295 return NXT_UNIT_ERROR; 2296 } 2297 2298 if (req->request->websocket_handshake && req->response->status == 101) { 2299 nxt_unit_response_upgrade(req); 2300 } 2301 2302 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 2303 req->response->fields_count, 2304 (int) (req->response_buf->free 2305 - req->response_buf->start)); 2306 2307 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 2308 2309 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2310 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 2311 req->response = NULL; 2312 req->response_buf = NULL; 2313 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2314 2315 nxt_unit_mmap_buf_free(mmap_buf); 2316 } 2317 2318 return rc; 2319 } 2320 2321 2322 int 2323 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 2324 { 2325 nxt_unit_request_info_impl_t *req_impl; 2326 2327 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2328 2329 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 2330 } 2331 2332 2333 nxt_unit_buf_t * 2334 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 2335 { 2336 int rc; 2337 nxt_unit_mmap_buf_t *mmap_buf; 2338 nxt_unit_request_info_impl_t *req_impl; 2339 2340 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 2341 nxt_unit_req_warn(req, "response_buf_alloc: " 2342 "requested buffer (%"PRIu32") too big", size); 2343 2344 return NULL; 2345 } 2346 2347 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 2348 2349 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2350 2351 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 2352 if (nxt_slow_path(mmap_buf == NULL)) { 2353 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 2354 2355 return NULL; 2356 } 2357 2358 mmap_buf->req = req; 2359 2360 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 2361 2362 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2363 size, size, mmap_buf, 2364 NULL); 2365 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2366 nxt_unit_mmap_buf_release(mmap_buf); 2367 2368 nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf"); 2369 2370 return NULL; 2371 } 2372 2373 return &mmap_buf->buf; 2374 } 2375 2376 2377 static nxt_unit_mmap_buf_t * 2378 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 2379 { 2380 nxt_unit_mmap_buf_t *mmap_buf; 2381 nxt_unit_ctx_impl_t *ctx_impl; 2382 2383 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2384 2385 pthread_mutex_lock(&ctx_impl->mutex); 2386 2387 if (ctx_impl->free_buf == NULL) { 2388 pthread_mutex_unlock(&ctx_impl->mutex); 2389 2390 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t)); 2391 if (nxt_slow_path(mmap_buf == NULL)) { 2392 return NULL; 2393 } 2394 2395 } else { 2396 mmap_buf = ctx_impl->free_buf; 2397 2398 nxt_unit_mmap_buf_unlink(mmap_buf); 2399 2400 pthread_mutex_unlock(&ctx_impl->mutex); 2401 } 2402 2403 mmap_buf->ctx_impl = ctx_impl; 2404 2405 mmap_buf->hdr = NULL; 2406 mmap_buf->free_ptr = NULL; 2407 2408 return mmap_buf; 2409 } 2410 2411 2412 static void 2413 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 2414 { 2415 nxt_unit_mmap_buf_unlink(mmap_buf); 2416 2417 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 2418 2419 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 2420 2421 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 2422 } 2423 2424 2425 int 2426 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 2427 { 2428 return req->request->websocket_handshake; 2429 } 2430 2431 2432 int 2433 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 2434 { 2435 int rc; 2436 nxt_unit_request_info_impl_t *req_impl; 2437 2438 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2439 2440 if (nxt_slow_path(req_impl->websocket != 0)) { 2441 nxt_unit_req_debug(req, "upgrade: already upgraded"); 2442 2443 return NXT_UNIT_OK; 2444 } 2445 2446 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2447 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 2448 2449 return NXT_UNIT_ERROR; 2450 } 2451 2452 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2453 nxt_unit_req_warn(req, "upgrade: response already sent"); 2454 2455 return NXT_UNIT_ERROR; 2456 } 2457 2458 rc = nxt_unit_request_hash_add(req->ctx, req); 2459 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2460 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 2461 2462 return NXT_UNIT_ERROR; 2463 } 2464 2465 req_impl->websocket = 1; 2466 2467 req->response->status = 101; 2468 2469 return NXT_UNIT_OK; 2470 } 2471 2472 2473 int 2474 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 2475 { 2476 nxt_unit_request_info_impl_t *req_impl; 2477 2478 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2479 2480 return req_impl->websocket; 2481 } 2482 2483 2484 nxt_unit_request_info_t * 2485 nxt_unit_get_request_info_from_data(void *data) 2486 { 2487 nxt_unit_request_info_impl_t *req_impl; 2488 2489 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 2490 2491 return &req_impl->req; 2492 } 2493 2494 2495 int 2496 nxt_unit_buf_send(nxt_unit_buf_t *buf) 2497 { 2498 int rc; 2499 nxt_unit_mmap_buf_t *mmap_buf; 2500 nxt_unit_request_info_t *req; 2501 nxt_unit_request_info_impl_t *req_impl; 2502 2503 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2504 2505 req = mmap_buf->req; 2506 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2507 2508 nxt_unit_req_debug(req, "buf_send: %d bytes", 2509 (int) (buf->free - buf->start)); 2510 2511 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2512 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 2513 2514 return NXT_UNIT_ERROR; 2515 } 2516 2517 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2518 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 2519 2520 return NXT_UNIT_ERROR; 2521 } 2522 2523 if (nxt_fast_path(buf->free > buf->start)) { 2524 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2525 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2526 return rc; 2527 } 2528 } 2529 2530 nxt_unit_mmap_buf_free(mmap_buf); 2531 2532 return NXT_UNIT_OK; 2533 } 2534 2535 2536 static void 2537 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 2538 { 2539 int rc; 2540 nxt_unit_mmap_buf_t *mmap_buf; 2541 nxt_unit_request_info_t *req; 2542 2543 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2544 2545 req = mmap_buf->req; 2546 2547 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1); 2548 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 2549 nxt_unit_mmap_buf_free(mmap_buf); 2550 2551 nxt_unit_request_info_release(req); 2552 2553 } else { 2554 nxt_unit_request_done(req, rc); 2555 } 2556 } 2557 2558 2559 static int 2560 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 2561 nxt_unit_mmap_buf_t *mmap_buf, int last) 2562 { 2563 struct { 2564 nxt_port_msg_t msg; 2565 nxt_port_mmap_msg_t mmap_msg; 2566 } m; 2567 2568 int rc; 2569 u_char *last_used, *first_free; 2570 ssize_t res; 2571 nxt_chunk_id_t first_free_chunk; 2572 nxt_unit_buf_t *buf; 2573 nxt_unit_impl_t *lib; 2574 nxt_port_mmap_header_t *hdr; 2575 nxt_unit_request_info_impl_t *req_impl; 2576 2577 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 2578 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2579 2580 buf = &mmap_buf->buf; 2581 hdr = mmap_buf->hdr; 2582 2583 m.mmap_msg.size = buf->free - buf->start; 2584 2585 m.msg.stream = req_impl->stream; 2586 m.msg.pid = lib->pid; 2587 m.msg.reply_port = 0; 2588 m.msg.type = _NXT_PORT_MSG_DATA; 2589 m.msg.last = last != 0; 2590 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 2591 m.msg.nf = 0; 2592 m.msg.mf = 0; 2593 m.msg.tracking = 0; 2594 2595 rc = NXT_UNIT_ERROR; 2596 2597 if (m.msg.mmap) { 2598 m.mmap_msg.mmap_id = hdr->id; 2599 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, 2600 (u_char *) buf->start); 2601 2602 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 2603 req_impl->stream, 2604 (int) m.mmap_msg.mmap_id, 2605 (int) m.mmap_msg.chunk_id, 2606 (int) m.mmap_msg.size); 2607 2608 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), 2609 NULL, 0); 2610 if (nxt_slow_path(res != sizeof(m))) { 2611 goto free_buf; 2612 } 2613 2614 last_used = (u_char *) buf->free - 1; 2615 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 2616 2617 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { 2618 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 2619 2620 buf->start = (char *) first_free; 2621 buf->free = buf->start; 2622 2623 if (buf->end < buf->start) { 2624 buf->end = buf->start; 2625 } 2626 2627 } else { 2628 buf->start = NULL; 2629 buf->free = NULL; 2630 buf->end = NULL; 2631 2632 mmap_buf->hdr = NULL; 2633 } 2634 2635 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, 2636 (int) m.mmap_msg.chunk_id - (int) first_free_chunk); 2637 2638 nxt_unit_debug(req->ctx, "allocated_chunks %d", 2639 (int) lib->outgoing.allocated_chunks); 2640 2641 } else { 2642 if (nxt_slow_path(mmap_buf->plain_ptr == NULL 2643 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) 2644 { 2645 nxt_unit_alert(req->ctx, 2646 "#%"PRIu32": failed to send plain memory buffer" 2647 ": no space reserved for message header", 2648 req_impl->stream); 2649 2650 goto free_buf; 2651 } 2652 2653 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); 2654 2655 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", 2656 req_impl->stream, 2657 (int) (sizeof(m.msg) + m.mmap_msg.size)); 2658 2659 res = nxt_unit_port_send(req->ctx, req->response_port, 2660 buf->start - sizeof(m.msg), 2661 m.mmap_msg.size + sizeof(m.msg), 2662 NULL, 0); 2663 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { 2664 goto free_buf; 2665 } 2666 } 2667 2668 rc = NXT_UNIT_OK; 2669 2670 free_buf: 2671 2672 nxt_unit_free_outgoing_buf(mmap_buf); 2673 2674 return rc; 2675 } 2676 2677 2678 void 2679 nxt_unit_buf_free(nxt_unit_buf_t *buf) 2680 { 2681 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 2682 } 2683 2684 2685 static void 2686 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 2687 { 2688 nxt_unit_free_outgoing_buf(mmap_buf); 2689 2690 nxt_unit_mmap_buf_release(mmap_buf); 2691 } 2692 2693 2694 static void 2695 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) 2696 { 2697 if (mmap_buf->hdr != NULL) { 2698 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, 2699 mmap_buf->hdr, mmap_buf->buf.start, 2700 mmap_buf->buf.end - mmap_buf->buf.start); 2701 2702 mmap_buf->hdr = NULL; 2703 2704 return; 2705 } 2706 2707 if (mmap_buf->free_ptr != NULL) { 2708 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr); 2709 2710 mmap_buf->free_ptr = NULL; 2711 } 2712 } 2713 2714 2715 static nxt_unit_read_buf_t * 2716 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2717 { 2718 nxt_unit_ctx_impl_t *ctx_impl; 2719 nxt_unit_read_buf_t *rbuf; 2720 2721 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2722 2723 pthread_mutex_lock(&ctx_impl->mutex); 2724 2725 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); 2726 2727 pthread_mutex_unlock(&ctx_impl->mutex); 2728 2729 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 2730 2731 return rbuf; 2732 } 2733 2734 2735 static nxt_unit_read_buf_t * 2736 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2737 { 2738 nxt_queue_link_t *link; 2739 nxt_unit_read_buf_t *rbuf; 2740 2741 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { 2742 link = nxt_queue_first(&ctx_impl->free_rbuf); 2743 nxt_queue_remove(link); 2744 2745 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); 2746 2747 return rbuf; 2748 } 2749 2750 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t)); 2751 2752 if (nxt_fast_path(rbuf != NULL)) { 2753 rbuf->ctx_impl = ctx_impl; 2754 } 2755 2756 return rbuf; 2757 } 2758 2759 2760 static void 2761 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2762 nxt_unit_read_buf_t *rbuf) 2763 { 2764 nxt_unit_ctx_impl_t *ctx_impl; 2765 2766 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2767 2768 pthread_mutex_lock(&ctx_impl->mutex); 2769 2770 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); 2771 2772 pthread_mutex_unlock(&ctx_impl->mutex); 2773 } 2774 2775 2776 nxt_unit_buf_t * 2777 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2778 { 2779 nxt_unit_mmap_buf_t *mmap_buf; 2780 2781 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2782 2783 if (mmap_buf->next == NULL) { 2784 return NULL; 2785 } 2786 2787 return &mmap_buf->next->buf; 2788 } 2789 2790 2791 uint32_t 2792 nxt_unit_buf_max(void) 2793 { 2794 return PORT_MMAP_DATA_SIZE; 2795 } 2796 2797 2798 uint32_t 2799 nxt_unit_buf_min(void) 2800 { 2801 return PORT_MMAP_CHUNK_SIZE; 2802 } 2803 2804 2805 int 2806 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2807 size_t size) 2808 { 2809 ssize_t res; 2810 2811 res = nxt_unit_response_write_nb(req, start, size, size); 2812 2813 return res < 0 ? -res : NXT_UNIT_OK; 2814 } 2815 2816 2817 ssize_t 2818 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, 2819 size_t size, size_t min_size) 2820 { 2821 int rc; 2822 ssize_t sent; 2823 uint32_t part_size, min_part_size, buf_size; 2824 const char *part_start; 2825 nxt_unit_mmap_buf_t mmap_buf; 2826 nxt_unit_request_info_impl_t *req_impl; 2827 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2828 2829 nxt_unit_req_debug(req, "write: %d", (int) size); 2830 2831 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2832 2833 part_start = start; 2834 sent = 0; 2835 2836 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2837 nxt_unit_req_alert(req, "write: response not initialized yet"); 2838 2839 return -NXT_UNIT_ERROR; 2840 } 2841 2842 /* Check if response is not send yet. */ 2843 if (nxt_slow_path(req->response_buf != NULL)) { 2844 part_size = req->response_buf->end - req->response_buf->free; 2845 part_size = nxt_min(size, part_size); 2846 2847 rc = nxt_unit_response_add_content(req, part_start, part_size); 2848 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2849 return -rc; 2850 } 2851 2852 rc = nxt_unit_response_send(req); 2853 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2854 return -rc; 2855 } 2856 2857 size -= part_size; 2858 part_start += part_size; 2859 sent += part_size; 2860 2861 min_size -= nxt_min(min_size, part_size); 2862 } 2863 2864 while (size > 0) { 2865 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2866 min_part_size = nxt_min(min_size, part_size); 2867 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); 2868 2869 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, 2870 min_part_size, &mmap_buf, local_buf); 2871 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2872 return -rc; 2873 } 2874 2875 buf_size = mmap_buf.buf.end - mmap_buf.buf.free; 2876 if (nxt_slow_path(buf_size == 0)) { 2877 return sent; 2878 } 2879 part_size = nxt_min(buf_size, part_size); 2880 2881 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2882 part_start, part_size); 2883 2884 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2885 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2886 return -rc; 2887 } 2888 2889 size -= part_size; 2890 part_start += part_size; 2891 sent += part_size; 2892 2893 min_size -= nxt_min(min_size, part_size); 2894 } 2895 2896 return sent; 2897 } 2898 2899 2900 int 2901 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2902 nxt_unit_read_info_t *read_info) 2903 { 2904 int rc; 2905 ssize_t n; 2906 uint32_t buf_size; 2907 nxt_unit_buf_t *buf; 2908 nxt_unit_mmap_buf_t mmap_buf; 2909 nxt_unit_request_info_impl_t *req_impl; 2910 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2911 2912 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2913 2914 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2915 nxt_unit_req_alert(req, "write: response not initialized yet"); 2916 2917 return NXT_UNIT_ERROR; 2918 } 2919 2920 /* Check if response is not send yet. */ 2921 if (nxt_slow_path(req->response_buf != NULL)) { 2922 2923 /* Enable content in headers buf. */ 2924 rc = nxt_unit_response_add_content(req, "", 0); 2925 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2926 nxt_unit_req_error(req, "Failed to add piggyback content"); 2927 2928 return rc; 2929 } 2930 2931 buf = req->response_buf; 2932 2933 while (buf->end - buf->free > 0) { 2934 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2935 if (nxt_slow_path(n < 0)) { 2936 nxt_unit_req_error(req, "Read error"); 2937 2938 return NXT_UNIT_ERROR; 2939 } 2940 2941 /* Manually increase sizes. */ 2942 buf->free += n; 2943 req->response->piggyback_content_length += n; 2944 2945 if (read_info->eof) { 2946 break; 2947 } 2948 } 2949 2950 rc = nxt_unit_response_send(req); 2951 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2952 nxt_unit_req_error(req, "Failed to send headers with content"); 2953 2954 return rc; 2955 } 2956 2957 if (read_info->eof) { 2958 return NXT_UNIT_OK; 2959 } 2960 } 2961 2962 while (!read_info->eof) { 2963 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 2964 read_info->buf_size); 2965 2966 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); 2967 2968 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2969 buf_size, buf_size, 2970 &mmap_buf, local_buf); 2971 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2972 return rc; 2973 } 2974 2975 buf = &mmap_buf.buf; 2976 2977 while (!read_info->eof && buf->end > buf->free) { 2978 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2979 if (nxt_slow_path(n < 0)) { 2980 nxt_unit_req_error(req, "Read error"); 2981 2982 nxt_unit_free_outgoing_buf(&mmap_buf); 2983 2984 return NXT_UNIT_ERROR; 2985 } 2986 2987 buf->free += n; 2988 } 2989 2990 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2991 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2992 nxt_unit_req_error(req, "Failed to send content"); 2993 2994 return rc; 2995 } 2996 } 2997 2998 return NXT_UNIT_OK; 2999 } 3000 3001 3002 ssize_t 3003 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 3004 { 3005 ssize_t buf_res, res; 3006 3007 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, 3008 dst, size); 3009 3010 if (buf_res < (ssize_t) size && req->content_fd != -1) { 3011 res = read(req->content_fd, dst, size); 3012 if (nxt_slow_path(res < 0)) { 3013 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 3014 strerror(errno), errno); 3015 3016 return res; 3017 } 3018 3019 if (res < (ssize_t) size) { 3020 nxt_unit_close(req->content_fd); 3021 3022 req->content_fd = -1; 3023 } 3024 3025 req->content_length -= res; 3026 size -= res; 3027 3028 dst = nxt_pointer_to(dst, res); 3029 3030 } else { 3031 res = 0; 3032 } 3033 3034 return buf_res + res; 3035 } 3036 3037 3038 ssize_t 3039 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size) 3040 { 3041 char *p; 3042 size_t l_size, b_size; 3043 nxt_unit_buf_t *b; 3044 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf; 3045 3046 if (req->content_length == 0) { 3047 return 0; 3048 } 3049 3050 l_size = 0; 3051 3052 b = req->content_buf; 3053 3054 while (b != NULL) { 3055 b_size = b->end - b->free; 3056 p = memchr(b->free, '\n', b_size); 3057 3058 if (p != NULL) { 3059 p++; 3060 l_size += p - b->free; 3061 break; 3062 } 3063 3064 l_size += b_size; 3065 3066 if (max_size <= l_size) { 3067 break; 3068 } 3069 3070 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf); 3071 if (mmap_buf->next == NULL 3072 && req->content_fd != -1 3073 && l_size < req->content_length) 3074 { 3075 preread_buf = nxt_unit_request_preread(req, 16384); 3076 if (nxt_slow_path(preread_buf == NULL)) { 3077 return -1; 3078 } 3079 3080 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf); 3081 } 3082 3083 b = nxt_unit_buf_next(b); 3084 } 3085 3086 return nxt_min(max_size, l_size); 3087 } 3088 3089 3090 static nxt_unit_mmap_buf_t * 3091 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) 3092 { 3093 ssize_t res; 3094 nxt_unit_mmap_buf_t *mmap_buf; 3095 3096 if (req->content_fd == -1) { 3097 nxt_unit_req_alert(req, "preread: content_fd == -1"); 3098 return NULL; 3099 } 3100 3101 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 3102 if (nxt_slow_path(mmap_buf == NULL)) { 3103 nxt_unit_req_alert(req, "preread: failed to allocate buf"); 3104 return NULL; 3105 } 3106 3107 mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size); 3108 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3109 nxt_unit_req_alert(req, "preread: failed to allocate buf memory"); 3110 nxt_unit_mmap_buf_release(mmap_buf); 3111 return NULL; 3112 } 3113 3114 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3115 3116 mmap_buf->hdr = NULL; 3117 mmap_buf->buf.start = mmap_buf->free_ptr; 3118 mmap_buf->buf.free = mmap_buf->buf.start; 3119 mmap_buf->buf.end = mmap_buf->buf.start + size; 3120 3121 res = read(req->content_fd, mmap_buf->free_ptr, size); 3122 if (res < 0) { 3123 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 3124 strerror(errno), errno); 3125 3126 nxt_unit_mmap_buf_free(mmap_buf); 3127 3128 return NULL; 3129 } 3130 3131 if (res < (ssize_t) size) { 3132 nxt_unit_close(req->content_fd); 3133 3134 req->content_fd = -1; 3135 } 3136 3137 nxt_unit_req_debug(req, "preread: read %d", (int) res); 3138 3139 mmap_buf->buf.end = mmap_buf->buf.free + res; 3140 3141 return mmap_buf; 3142 } 3143 3144 3145 static ssize_t 3146 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) 3147 { 3148 u_char *p; 3149 size_t rest, copy, read; 3150 nxt_unit_buf_t *buf, *last_buf; 3151 3152 p = dst; 3153 rest = size; 3154 3155 buf = *b; 3156 last_buf = buf; 3157 3158 while (buf != NULL) { 3159 last_buf = buf; 3160 3161 copy = buf->end - buf->free; 3162 copy = nxt_min(rest, copy); 3163 3164 p = nxt_cpymem(p, buf->free, copy); 3165 3166 buf->free += copy; 3167 rest -= copy; 3168 3169 if (rest == 0) { 3170 if (buf->end == buf->free) { 3171 buf = nxt_unit_buf_next(buf); 3172 } 3173 3174 break; 3175 } 3176 3177 buf = nxt_unit_buf_next(buf); 3178 } 3179 3180 *b = last_buf; 3181 3182 read = size - rest; 3183 3184 *len -= read; 3185 3186 return read; 3187 } 3188 3189 3190 void 3191 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 3192 { 3193 uint32_t size; 3194 nxt_port_msg_t msg; 3195 nxt_unit_impl_t *lib; 3196 nxt_unit_request_info_impl_t *req_impl; 3197 3198 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 3199 3200 nxt_unit_req_debug(req, "done: %d", rc); 3201 3202 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3203 goto skip_response_send; 3204 } 3205 3206 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 3207 3208 size = nxt_length("Content-Type") + nxt_length("text/plain"); 3209 3210 rc = nxt_unit_response_init(req, 200, 1, size); 3211 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3212 goto skip_response_send; 3213 } 3214 3215 rc = nxt_unit_response_add_field(req, "Content-Type", 3216 nxt_length("Content-Type"), 3217 "text/plain", nxt_length("text/plain")); 3218 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3219 goto skip_response_send; 3220 } 3221 } 3222 3223 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 3224 3225 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 3226 3227 nxt_unit_buf_send_done(req->response_buf); 3228 3229 return; 3230 } 3231 3232 skip_response_send: 3233 3234 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 3235 3236 msg.stream = req_impl->stream; 3237 msg.pid = lib->pid; 3238 msg.reply_port = 0; 3239 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 3240 : _NXT_PORT_MSG_RPC_ERROR; 3241 msg.last = 1; 3242 msg.mmap = 0; 3243 msg.nf = 0; 3244 msg.mf = 0; 3245 msg.tracking = 0; 3246 3247 (void) nxt_unit_port_send(req->ctx, req->response_port, 3248 &msg, sizeof(msg), NULL, 0); 3249 3250 nxt_unit_request_info_release(req); 3251 } 3252 3253 3254 int 3255 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 3256 uint8_t last, const void *start, size_t size) 3257 { 3258 const struct iovec iov = { (void *) start, size }; 3259 3260 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); 3261 } 3262 3263 3264 int 3265 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 3266 uint8_t last, const struct iovec *iov, int iovcnt) 3267 { 3268 int i, rc; 3269 size_t l, copy; 3270 uint32_t payload_len, buf_size, alloc_size; 3271 const uint8_t *b; 3272 nxt_unit_buf_t *buf; 3273 nxt_unit_mmap_buf_t mmap_buf; 3274 nxt_websocket_header_t *wh; 3275 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 3276 3277 payload_len = 0; 3278 3279 for (i = 0; i < iovcnt; i++) { 3280 payload_len += iov[i].iov_len; 3281 } 3282 3283 buf_size = 10 + payload_len; 3284 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 3285 3286 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3287 alloc_size, alloc_size, 3288 &mmap_buf, local_buf); 3289 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3290 return rc; 3291 } 3292 3293 buf = &mmap_buf.buf; 3294 3295 buf->start[0] = 0; 3296 buf->start[1] = 0; 3297 3298 buf_size -= buf->end - buf->start; 3299 3300 wh = (void *) buf->free; 3301 3302 buf->free = nxt_websocket_frame_init(wh, payload_len); 3303 wh->fin = last; 3304 wh->opcode = opcode; 3305 3306 for (i = 0; i < iovcnt; i++) { 3307 b = iov[i].iov_base; 3308 l = iov[i].iov_len; 3309 3310 while (l > 0) { 3311 copy = buf->end - buf->free; 3312 copy = nxt_min(l, copy); 3313 3314 buf->free = nxt_cpymem(buf->free, b, copy); 3315 b += copy; 3316 l -= copy; 3317 3318 if (l > 0) { 3319 if (nxt_fast_path(buf->free > buf->start)) { 3320 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3321 3322 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3323 return rc; 3324 } 3325 } 3326 3327 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 3328 3329 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3330 alloc_size, alloc_size, 3331 &mmap_buf, local_buf); 3332 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3333 return rc; 3334 } 3335 3336 buf_size -= buf->end - buf->start; 3337 } 3338 } 3339 } 3340 3341 if (buf->free > buf->start) { 3342 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3343 } 3344 3345 return rc; 3346 } 3347 3348 3349 ssize_t 3350 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 3351 size_t size) 3352 { 3353 ssize_t res; 3354 uint8_t *b; 3355 uint64_t i, d; 3356 3357 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, 3358 dst, size); 3359 3360 if (ws->mask == NULL) { 3361 return res; 3362 } 3363 3364 b = dst; 3365 d = (ws->payload_len - ws->content_length - res) % 4; 3366 3367 for (i = 0; i < (uint64_t) res; i++) { 3368 b[i] ^= ws->mask[ (i + d) % 4 ]; 3369 } 3370 3371 return res; 3372 } 3373 3374 3375 int 3376 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) 3377 { 3378 char *b; 3379 size_t size, hsize; 3380 nxt_unit_websocket_frame_impl_t *ws_impl; 3381 3382 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 3383 3384 if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) { 3385 return NXT_UNIT_OK; 3386 } 3387 3388 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; 3389 3390 b = nxt_unit_malloc(ws->req->ctx, size); 3391 if (nxt_slow_path(b == NULL)) { 3392 return NXT_UNIT_ERROR; 3393 } 3394 3395 memcpy(b, ws_impl->buf->buf.start, size); 3396 3397 hsize = nxt_websocket_frame_header_size(b); 3398 3399 ws_impl->buf->buf.start = b; 3400 ws_impl->buf->buf.free = b + hsize; 3401 ws_impl->buf->buf.end = b + size; 3402 3403 ws_impl->buf->free_ptr = b; 3404 3405 ws_impl->ws.header = (nxt_websocket_header_t *) b; 3406 3407 if (ws_impl->ws.header->mask) { 3408 ws_impl->ws.mask = (uint8_t *) b + hsize - 4; 3409 3410 } else { 3411 ws_impl->ws.mask = NULL; 3412 } 3413 3414 return NXT_UNIT_OK; 3415 } 3416 3417 3418 void 3419 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) 3420 { 3421 nxt_unit_websocket_frame_release(ws); 3422 } 3423 3424 3425 static nxt_port_mmap_header_t * 3426 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 3427 nxt_chunk_id_t *c, int *n, int min_n) 3428 { 3429 int res, nchunks, i; 3430 uint32_t outgoing_size; 3431 nxt_unit_mmap_t *mm, *mm_end; 3432 nxt_unit_impl_t *lib; 3433 nxt_port_mmap_header_t *hdr; 3434 3435 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3436 3437 pthread_mutex_lock(&lib->outgoing.mutex); 3438 3439 retry: 3440 3441 outgoing_size = lib->outgoing.size; 3442 3443 mm_end = lib->outgoing.elts + outgoing_size; 3444 3445 for (mm = lib->outgoing.elts; mm < mm_end; mm++) { 3446 hdr = mm->hdr; 3447 3448 if (hdr->sent_over != 0xFFFFu 3449 && (hdr->sent_over != port->id.id 3450 || mm->src_thread != pthread_self())) 3451 { 3452 continue; 3453 } 3454 3455 *c = 0; 3456 3457 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { 3458 nchunks = 1; 3459 3460 while (nchunks < *n) { 3461 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, 3462 *c + nchunks); 3463 3464 if (res == 0) { 3465 if (nchunks >= min_n) { 3466 *n = nchunks; 3467 3468 goto unlock; 3469 } 3470 3471 for (i = 0; i < nchunks; i++) { 3472 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); 3473 } 3474 3475 *c += nchunks + 1; 3476 nchunks = 0; 3477 break; 3478 } 3479 3480 nchunks++; 3481 } 3482 3483 if (nchunks >= min_n) { 3484 *n = nchunks; 3485 3486 goto unlock; 3487 } 3488 } 3489 3490 hdr->oosm = 1; 3491 } 3492 3493 if (outgoing_size >= lib->shm_mmap_limit) { 3494 /* Cannot allocate more shared memory. */ 3495 pthread_mutex_unlock(&lib->outgoing.mutex); 3496 3497 if (min_n == 0) { 3498 *n = 0; 3499 } 3500 3501 if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n 3502 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) 3503 { 3504 /* Memory allocated by application, but not send to router. */ 3505 return NULL; 3506 } 3507 3508 /* Notify router about OOSM condition. */ 3509 3510 res = nxt_unit_send_oosm(ctx, port); 3511 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3512 return NULL; 3513 } 3514 3515 /* Return if caller can handle OOSM condition. Non-blocking mode. */ 3516 3517 if (min_n == 0) { 3518 return NULL; 3519 } 3520 3521 nxt_unit_debug(ctx, "oosm: waiting for ACK"); 3522 3523 res = nxt_unit_wait_shm_ack(ctx); 3524 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3525 return NULL; 3526 } 3527 3528 nxt_unit_debug(ctx, "oosm: retry"); 3529 3530 pthread_mutex_lock(&lib->outgoing.mutex); 3531 3532 goto retry; 3533 } 3534 3535 *c = 0; 3536 hdr = nxt_unit_new_mmap(ctx, port, *n); 3537 3538 unlock: 3539 3540 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n); 3541 3542 nxt_unit_debug(ctx, "allocated_chunks %d", 3543 (int) lib->outgoing.allocated_chunks); 3544 3545 pthread_mutex_unlock(&lib->outgoing.mutex); 3546 3547 return hdr; 3548 } 3549 3550 3551 static int 3552 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 3553 { 3554 ssize_t res; 3555 nxt_port_msg_t msg; 3556 nxt_unit_impl_t *lib; 3557 3558 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3559 3560 msg.stream = 0; 3561 msg.pid = lib->pid; 3562 msg.reply_port = 0; 3563 msg.type = _NXT_PORT_MSG_OOSM; 3564 msg.last = 0; 3565 msg.mmap = 0; 3566 msg.nf = 0; 3567 msg.mf = 0; 3568 msg.tracking = 0; 3569 3570 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 3571 if (nxt_slow_path(res != sizeof(msg))) { 3572 return NXT_UNIT_ERROR; 3573 } 3574 3575 return NXT_UNIT_OK; 3576 } 3577 3578 3579 static int 3580 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) 3581 { 3582 int res; 3583 nxt_unit_ctx_impl_t *ctx_impl; 3584 nxt_unit_read_buf_t *rbuf; 3585 3586 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3587 3588 while (1) { 3589 rbuf = nxt_unit_read_buf_get(ctx); 3590 if (nxt_slow_path(rbuf == NULL)) { 3591 return NXT_UNIT_ERROR; 3592 } 3593 3594 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 3595 if (res == NXT_UNIT_ERROR) { 3596 nxt_unit_read_buf_release(ctx, rbuf); 3597 3598 return NXT_UNIT_ERROR; 3599 } 3600 3601 if (nxt_unit_is_shm_ack(rbuf)) { 3602 nxt_unit_read_buf_release(ctx, rbuf); 3603 break; 3604 } 3605 3606 pthread_mutex_lock(&ctx_impl->mutex); 3607 3608 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link); 3609 3610 pthread_mutex_unlock(&ctx_impl->mutex); 3611 3612 if (nxt_unit_is_quit(rbuf)) { 3613 nxt_unit_debug(ctx, "oosm: quit received"); 3614 3615 return NXT_UNIT_ERROR; 3616 } 3617 } 3618 3619 return NXT_UNIT_OK; 3620 } 3621 3622 3623 static nxt_unit_mmap_t * 3624 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 3625 { 3626 uint32_t cap, n; 3627 nxt_unit_mmap_t *e; 3628 3629 if (nxt_fast_path(mmaps->size > i)) { 3630 return mmaps->elts + i; 3631 } 3632 3633 cap = mmaps->cap; 3634 3635 if (cap == 0) { 3636 cap = i + 1; 3637 } 3638 3639 while (i + 1 > cap) { 3640 3641 if (cap < 16) { 3642 cap = cap * 2; 3643 3644 } else { 3645 cap = cap + cap / 2; 3646 } 3647 } 3648 3649 if (cap != mmaps->cap) { 3650 3651 e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t)); 3652 if (nxt_slow_path(e == NULL)) { 3653 return NULL; 3654 } 3655 3656 mmaps->elts = e; 3657 3658 for (n = mmaps->cap; n < cap; n++) { 3659 e = mmaps->elts + n; 3660 3661 e->hdr = NULL; 3662 nxt_queue_init(&e->awaiting_rbuf); 3663 } 3664 3665 mmaps->cap = cap; 3666 } 3667 3668 if (i + 1 > mmaps->size) { 3669 mmaps->size = i + 1; 3670 } 3671 3672 return mmaps->elts + i; 3673 } 3674 3675 3676 static nxt_port_mmap_header_t * 3677 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) 3678 { 3679 int i, fd, rc; 3680 void *mem; 3681 nxt_unit_mmap_t *mm; 3682 nxt_unit_impl_t *lib; 3683 nxt_port_mmap_header_t *hdr; 3684 3685 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3686 3687 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size); 3688 if (nxt_slow_path(mm == NULL)) { 3689 nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); 3690 3691 return NULL; 3692 } 3693 3694 fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE); 3695 if (nxt_slow_path(fd == -1)) { 3696 goto remove_fail; 3697 } 3698 3699 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 3700 if (nxt_slow_path(mem == MAP_FAILED)) { 3701 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, 3702 strerror(errno), errno); 3703 3704 nxt_unit_close(fd); 3705 3706 goto remove_fail; 3707 } 3708 3709 mm->hdr = mem; 3710 hdr = mem; 3711 3712 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 3713 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 3714 3715 hdr->id = lib->outgoing.size - 1; 3716 hdr->src_pid = lib->pid; 3717 hdr->dst_pid = port->id.pid; 3718 hdr->sent_over = port->id.id; 3719 mm->src_thread = pthread_self(); 3720 3721 /* Mark first n chunk(s) as busy */ 3722 for (i = 0; i < n; i++) { 3723 nxt_port_mmap_set_chunk_busy(hdr->free_map, i); 3724 } 3725 3726 /* Mark as busy chunk followed the last available chunk. */ 3727 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 3728 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 3729 3730 pthread_mutex_unlock(&lib->outgoing.mutex); 3731 3732 rc = nxt_unit_send_mmap(ctx, port, fd); 3733 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3734 munmap(mem, PORT_MMAP_SIZE); 3735 hdr = NULL; 3736 3737 } else { 3738 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", 3739 hdr->id, (int) lib->pid, (int) port->id.pid); 3740 } 3741 3742 nxt_unit_close(fd); 3743 3744 pthread_mutex_lock(&lib->outgoing.mutex); 3745 3746 if (nxt_fast_path(hdr != NULL)) { 3747 return hdr; 3748 } 3749 3750 remove_fail: 3751 3752 lib->outgoing.size--; 3753 3754 return NULL; 3755 } 3756 3757 3758 static int 3759 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) 3760 { 3761 int fd; 3762 nxt_unit_impl_t *lib; 3763 3764 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3765 3766 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) 3767 char name[64]; 3768 3769 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", 3770 lib->pid, (void *) pthread_self()); 3771 #endif 3772 3773 #if (NXT_HAVE_MEMFD_CREATE) 3774 3775 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 3776 if (nxt_slow_path(fd == -1)) { 3777 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, 3778 strerror(errno), errno); 3779 3780 return -1; 3781 } 3782 3783 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); 3784 3785 #elif (NXT_HAVE_SHM_OPEN_ANON) 3786 3787 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 3788 if (nxt_slow_path(fd == -1)) { 3789 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", 3790 strerror(errno), errno); 3791 3792 return -1; 3793 } 3794 3795 #elif (NXT_HAVE_SHM_OPEN) 3796 3797 /* Just in case. */ 3798 shm_unlink(name); 3799 3800 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 3801 if (nxt_slow_path(fd == -1)) { 3802 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, 3803 strerror(errno), errno); 3804 3805 return -1; 3806 } 3807 3808 if (nxt_slow_path(shm_unlink(name) == -1)) { 3809 nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name, 3810 strerror(errno), errno); 3811 } 3812 3813 #else 3814 3815 #error No working shared memory implementation. 3816 3817 #endif 3818 3819 if (nxt_slow_path(ftruncate(fd, size) == -1)) { 3820 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, 3821 strerror(errno), errno); 3822 3823 nxt_unit_close(fd); 3824 3825 return -1; 3826 } 3827 3828 return fd; 3829 } 3830 3831 3832 static int 3833 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) 3834 { 3835 ssize_t res; 3836 nxt_port_msg_t msg; 3837 nxt_unit_impl_t *lib; 3838 union { 3839 struct cmsghdr cm; 3840 char space[CMSG_SPACE(sizeof(int))]; 3841 } cmsg; 3842 3843 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3844 3845 msg.stream = 0; 3846 msg.pid = lib->pid; 3847 msg.reply_port = 0; 3848 msg.type = _NXT_PORT_MSG_MMAP; 3849 msg.last = 0; 3850 msg.mmap = 0; 3851 msg.nf = 0; 3852 msg.mf = 0; 3853 msg.tracking = 0; 3854 3855 /* 3856 * Fill all padding fields with 0. 3857 * Code in Go 1.11 validate cmsghdr using padding field as part of len. 3858 * See Cmsghdr definition and socketControlMessageHeaderAndData function. 3859 */ 3860 memset(&cmsg, 0, sizeof(cmsg)); 3861 3862 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 3863 cmsg.cm.cmsg_level = SOL_SOCKET; 3864 cmsg.cm.cmsg_type = SCM_RIGHTS; 3865 3866 /* 3867 * memcpy() is used instead of simple 3868 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 3869 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 3870 * dereferencing type-punned pointer will break strict-aliasing rules 3871 * 3872 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3873 * in the same simple assignment as in the code above. 3874 */ 3875 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3876 3877 res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), 3878 &cmsg, sizeof(cmsg)); 3879 if (nxt_slow_path(res != sizeof(msg))) { 3880 return NXT_UNIT_ERROR; 3881 } 3882 3883 return NXT_UNIT_OK; 3884 } 3885 3886 3887 static int 3888 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 3889 uint32_t size, uint32_t min_size, 3890 nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) 3891 { 3892 int nchunks, min_nchunks; 3893 nxt_chunk_id_t c; 3894 nxt_port_mmap_header_t *hdr; 3895 3896 if (size <= NXT_UNIT_MAX_PLAIN_SIZE) { 3897 if (local_buf != NULL) { 3898 mmap_buf->free_ptr = NULL; 3899 mmap_buf->plain_ptr = local_buf; 3900 3901 } else { 3902 mmap_buf->free_ptr = nxt_unit_malloc(ctx, 3903 size + sizeof(nxt_port_msg_t)); 3904 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3905 return NXT_UNIT_ERROR; 3906 } 3907 3908 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3909 } 3910 3911 mmap_buf->hdr = NULL; 3912 mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); 3913 mmap_buf->buf.free = mmap_buf->buf.start; 3914 mmap_buf->buf.end = mmap_buf->buf.start + size; 3915 3916 nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", 3917 mmap_buf->buf.start, (int) size); 3918 3919 return NXT_UNIT_OK; 3920 } 3921 3922 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3923 min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3924 3925 hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks); 3926 if (nxt_slow_path(hdr == NULL)) { 3927 if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { 3928 mmap_buf->hdr = NULL; 3929 mmap_buf->buf.start = NULL; 3930 mmap_buf->buf.free = NULL; 3931 mmap_buf->buf.end = NULL; 3932 mmap_buf->free_ptr = NULL; 3933 3934 return NXT_UNIT_OK; 3935 } 3936 3937 return NXT_UNIT_ERROR; 3938 } 3939 3940 mmap_buf->hdr = hdr; 3941 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); 3942 mmap_buf->buf.free = mmap_buf->buf.start; 3943 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; 3944 mmap_buf->free_ptr = NULL; 3945 mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3946 3947 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", 3948 (int) hdr->id, (int) c, 3949 (int) (nchunks * PORT_MMAP_CHUNK_SIZE)); 3950 3951 return NXT_UNIT_OK; 3952 } 3953 3954 3955 static int 3956 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 3957 { 3958 int rc; 3959 void *mem; 3960 nxt_queue_t awaiting_rbuf; 3961 struct stat mmap_stat; 3962 nxt_unit_mmap_t *mm; 3963 nxt_unit_impl_t *lib; 3964 nxt_unit_ctx_impl_t *ctx_impl; 3965 nxt_unit_read_buf_t *rbuf; 3966 nxt_port_mmap_header_t *hdr; 3967 3968 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3969 3970 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 3971 3972 if (fstat(fd, &mmap_stat) == -1) { 3973 nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, 3974 strerror(errno), errno); 3975 3976 return NXT_UNIT_ERROR; 3977 } 3978 3979 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, 3980 MAP_SHARED, fd, 0); 3981 if (nxt_slow_path(mem == MAP_FAILED)) { 3982 nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)", 3983 strerror(errno), errno); 3984 3985 return NXT_UNIT_ERROR; 3986 } 3987 3988 hdr = mem; 3989 3990 if (nxt_slow_path(hdr->src_pid != pid)) { 3991 3992 nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header " 3993 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 3994 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 3995 3996 munmap(mem, PORT_MMAP_SIZE); 3997 3998 return NXT_UNIT_ERROR; 3999 } 4000 4001 nxt_queue_init(&awaiting_rbuf); 4002 4003 pthread_mutex_lock(&lib->incoming.mutex); 4004 4005 mm = nxt_unit_mmap_at(&lib->incoming, hdr->id); 4006 if (nxt_slow_path(mm == NULL)) { 4007 nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array"); 4008 4009 munmap(mem, PORT_MMAP_SIZE); 4010 4011 rc = NXT_UNIT_ERROR; 4012 4013 } else { 4014 mm->hdr = hdr; 4015 4016 hdr->sent_over = 0xFFFFu; 4017 4018 nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf); 4019 nxt_queue_init(&mm->awaiting_rbuf); 4020 4021 rc = NXT_UNIT_OK; 4022 } 4023 4024 pthread_mutex_unlock(&lib->incoming.mutex); 4025 4026 nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { 4027 4028 ctx_impl = rbuf->ctx_impl; 4029 4030 pthread_mutex_lock(&ctx_impl->mutex); 4031 4032 nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link); 4033 4034 pthread_mutex_unlock(&ctx_impl->mutex); 4035 4036 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); 4037 4038 nxt_unit_awake_ctx(ctx, ctx_impl); 4039 4040 } nxt_queue_loop; 4041 4042 return rc; 4043 } 4044 4045 4046 static void 4047 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl) 4048 { 4049 nxt_port_msg_t msg; 4050 4051 if (nxt_fast_path(ctx == &ctx_impl->ctx)) { 4052 return; 4053 } 4054 4055 if (nxt_slow_path(ctx_impl->read_port == NULL 4056 || ctx_impl->read_port->out_fd == -1)) 4057 { 4058 nxt_unit_alert(ctx, "target context read_port is NULL or not writable"); 4059 4060 return; 4061 } 4062 4063 memset(&msg, 0, sizeof(nxt_port_msg_t)); 4064 4065 msg.type = _NXT_PORT_MSG_RPC_READY; 4066 4067 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, 4068 &msg, sizeof(msg), NULL, 0); 4069 } 4070 4071 4072 static void 4073 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) 4074 { 4075 pthread_mutex_init(&mmaps->mutex, NULL); 4076 4077 mmaps->size = 0; 4078 mmaps->cap = 0; 4079 mmaps->elts = NULL; 4080 mmaps->allocated_chunks = 0; 4081 } 4082 4083 4084 nxt_inline void 4085 nxt_unit_process_use(nxt_unit_process_t *process) 4086 { 4087 nxt_atomic_fetch_add(&process->use_count, 1); 4088 } 4089 4090 4091 nxt_inline void 4092 nxt_unit_process_release(nxt_unit_process_t *process) 4093 { 4094 long c; 4095 4096 c = nxt_atomic_fetch_add(&process->use_count, -1); 4097 4098 if (c == 1) { 4099 nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); 4100 4101 nxt_unit_free(NULL, process); 4102 } 4103 } 4104 4105 4106 static void 4107 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) 4108 { 4109 nxt_unit_mmap_t *mm, *end; 4110 4111 if (mmaps->elts != NULL) { 4112 end = mmaps->elts + mmaps->size; 4113 4114 for (mm = mmaps->elts; mm < end; mm++) { 4115 munmap(mm->hdr, PORT_MMAP_SIZE); 4116 } 4117 4118 nxt_unit_free(NULL, mmaps->elts); 4119 } 4120 4121 pthread_mutex_destroy(&mmaps->mutex); 4122 } 4123 4124 4125 static int 4126 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, 4127 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, 4128 nxt_unit_read_buf_t *rbuf) 4129 { 4130 int res, need_rbuf; 4131 nxt_unit_mmap_t *mm; 4132 nxt_unit_ctx_impl_t *ctx_impl; 4133 4134 mm = nxt_unit_mmap_at(mmaps, id); 4135 if (nxt_slow_path(mm == NULL)) { 4136 nxt_unit_alert(ctx, "failed to allocate mmap"); 4137 4138 pthread_mutex_unlock(&mmaps->mutex); 4139 4140 *hdr = NULL; 4141 4142 return NXT_UNIT_ERROR; 4143 } 4144 4145 *hdr = mm->hdr; 4146 4147 if (nxt_fast_path(*hdr != NULL)) { 4148 return NXT_UNIT_OK; 4149 } 4150 4151 need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf); 4152 4153 nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link); 4154 4155 pthread_mutex_unlock(&mmaps->mutex); 4156 4157 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4158 4159 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 4160 4161 if (need_rbuf) { 4162 res = nxt_unit_get_mmap(ctx, pid, id); 4163 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 4164 return NXT_UNIT_ERROR; 4165 } 4166 } 4167 4168 return NXT_UNIT_AGAIN; 4169 } 4170 4171 4172 static int 4173 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 4174 nxt_unit_read_buf_t *rbuf) 4175 { 4176 int res; 4177 void *start; 4178 uint32_t size; 4179 nxt_unit_impl_t *lib; 4180 nxt_unit_mmaps_t *mmaps; 4181 nxt_unit_mmap_buf_t *b, **incoming_tail; 4182 nxt_port_mmap_msg_t *mmap_msg, *end; 4183 nxt_port_mmap_header_t *hdr; 4184 4185 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 4186 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 4187 recv_msg->stream, (int) recv_msg->size); 4188 4189 return NXT_UNIT_ERROR; 4190 } 4191 4192 mmap_msg = recv_msg->start; 4193 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 4194 4195 incoming_tail = &recv_msg->incoming_buf; 4196 4197 /* Allocating buffer structures. */ 4198 for (; mmap_msg < end; mmap_msg++) { 4199 b = nxt_unit_mmap_buf_get(ctx); 4200 if (nxt_slow_path(b == NULL)) { 4201 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 4202 recv_msg->stream); 4203 4204 while (recv_msg->incoming_buf != NULL) { 4205 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 4206 } 4207 4208 return NXT_UNIT_ERROR; 4209 } 4210 4211 nxt_unit_mmap_buf_insert(incoming_tail, b); 4212 incoming_tail = &b->next; 4213 } 4214 4215 b = recv_msg->incoming_buf; 4216 mmap_msg = recv_msg->start; 4217 4218 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4219 4220 mmaps = &lib->incoming; 4221 4222 pthread_mutex_lock(&mmaps->mutex); 4223 4224 for (; mmap_msg < end; mmap_msg++) { 4225 res = nxt_unit_check_rbuf_mmap(ctx, mmaps, 4226 recv_msg->pid, mmap_msg->mmap_id, 4227 &hdr, rbuf); 4228 4229 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4230 while (recv_msg->incoming_buf != NULL) { 4231 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 4232 } 4233 4234 return res; 4235 } 4236 4237 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 4238 size = mmap_msg->size; 4239 4240 if (recv_msg->start == mmap_msg) { 4241 recv_msg->start = start; 4242 recv_msg->size = size; 4243 } 4244 4245 b->buf.start = start; 4246 b->buf.free = start; 4247 b->buf.end = b->buf.start + size; 4248 b->hdr = hdr; 4249 4250 b = b->next; 4251 4252 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 4253 recv_msg->stream, 4254 start, (int) size, 4255 (int) hdr->src_pid, (int) hdr->dst_pid, 4256 (int) hdr->id, (int) mmap_msg->chunk_id, 4257 (int) mmap_msg->size); 4258 } 4259 4260 pthread_mutex_unlock(&mmaps->mutex); 4261 4262 return NXT_UNIT_OK; 4263 } 4264 4265 4266 static int 4267 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) 4268 { 4269 ssize_t res; 4270 nxt_unit_impl_t *lib; 4271 nxt_unit_ctx_impl_t *ctx_impl; 4272 4273 struct { 4274 nxt_port_msg_t msg; 4275 nxt_port_msg_get_mmap_t get_mmap; 4276 } m; 4277 4278 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4279 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4280 4281 memset(&m.msg, 0, sizeof(nxt_port_msg_t)); 4282 4283 m.msg.pid = lib->pid; 4284 m.msg.reply_port = ctx_impl->read_port->id.id; 4285 m.msg.type = _NXT_PORT_MSG_GET_MMAP; 4286 4287 m.get_mmap.id = id; 4288 4289 nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); 4290 4291 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); 4292 if (nxt_slow_path(res != sizeof(m))) { 4293 return NXT_UNIT_ERROR; 4294 } 4295 4296 return NXT_UNIT_OK; 4297 } 4298 4299 4300 static void 4301 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, 4302 void *start, uint32_t size) 4303 { 4304 int freed_chunks; 4305 u_char *p, *end; 4306 nxt_chunk_id_t c; 4307 nxt_unit_impl_t *lib; 4308 4309 memset(start, 0xA5, size); 4310 4311 p = start; 4312 end = p + size; 4313 c = nxt_port_mmap_chunk_id(hdr, p); 4314 freed_chunks = 0; 4315 4316 while (p < end) { 4317 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 4318 4319 p += PORT_MMAP_CHUNK_SIZE; 4320 c++; 4321 freed_chunks++; 4322 } 4323 4324 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4325 4326 if (hdr->src_pid == lib->pid && freed_chunks != 0) { 4327 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks); 4328 4329 nxt_unit_debug(ctx, "allocated_chunks %d", 4330 (int) lib->outgoing.allocated_chunks); 4331 } 4332 4333 if (hdr->dst_pid == lib->pid 4334 && freed_chunks != 0 4335 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) 4336 { 4337 nxt_unit_send_shm_ack(ctx, hdr->src_pid); 4338 } 4339 } 4340 4341 4342 static int 4343 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) 4344 { 4345 ssize_t res; 4346 nxt_port_msg_t msg; 4347 nxt_unit_impl_t *lib; 4348 4349 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4350 4351 msg.stream = 0; 4352 msg.pid = lib->pid; 4353 msg.reply_port = 0; 4354 msg.type = _NXT_PORT_MSG_SHM_ACK; 4355 msg.last = 0; 4356 msg.mmap = 0; 4357 msg.nf = 0; 4358 msg.mf = 0; 4359 msg.tracking = 0; 4360 4361 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 4362 if (nxt_slow_path(res != sizeof(msg))) { 4363 return NXT_UNIT_ERROR; 4364 } 4365 4366 return NXT_UNIT_OK; 4367 } 4368 4369 4370 static nxt_int_t 4371 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 4372 { 4373 nxt_process_t *process; 4374 4375 process = data; 4376 4377 if (lhq->key.length == sizeof(pid_t) 4378 && *(pid_t *) lhq->key.start == process->pid) 4379 { 4380 return NXT_OK; 4381 } 4382 4383 return NXT_DECLINED; 4384 } 4385 4386 4387 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 4388 NXT_LVLHSH_DEFAULT, 4389 nxt_unit_lvlhsh_pid_test, 4390 nxt_unit_lvlhsh_alloc, 4391 nxt_unit_lvlhsh_free, 4392 }; 4393 4394 4395 static inline void 4396 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) 4397 { 4398 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 4399 lhq->key.length = sizeof(*pid); 4400 lhq->key.start = (u_char *) pid; 4401 lhq->proto = &lvlhsh_processes_proto; 4402 } 4403 4404 4405 static nxt_unit_process_t * 4406 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) 4407 { 4408 nxt_unit_impl_t *lib; 4409 nxt_unit_process_t *process; 4410 nxt_lvlhsh_query_t lhq; 4411 4412 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4413 4414 nxt_unit_process_lhq_pid(&lhq, &pid); 4415 4416 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { 4417 process = lhq.value; 4418 nxt_unit_process_use(process); 4419 4420 return process; 4421 } 4422 4423 process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t)); 4424 if (nxt_slow_path(process == NULL)) { 4425 nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid); 4426 4427 return NULL; 4428 } 4429 4430 process->pid = pid; 4431 process->use_count = 2; 4432 process->next_port_id = 0; 4433 process->lib = lib; 4434 4435 nxt_queue_init(&process->ports); 4436 4437 lhq.replace = 0; 4438 lhq.value = process; 4439 4440 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { 4441 4442 case NXT_OK: 4443 break; 4444 4445 default: 4446 nxt_unit_alert(ctx, "process %d insert failed", (int) pid); 4447 4448 nxt_unit_free(ctx, process); 4449 process = NULL; 4450 break; 4451 } 4452 4453 return process; 4454 } 4455 4456 4457 static nxt_unit_process_t * 4458 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) 4459 { 4460 int rc; 4461 nxt_lvlhsh_query_t lhq; 4462 4463 nxt_unit_process_lhq_pid(&lhq, &pid); 4464 4465 if (remove) { 4466 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); 4467 4468 } else { 4469 rc = nxt_lvlhsh_find(&lib->processes, &lhq); 4470 } 4471 4472 if (rc == NXT_OK) { 4473 if (!remove) { 4474 nxt_unit_process_use(lhq.value); 4475 } 4476 4477 return lhq.value; 4478 } 4479 4480 return NULL; 4481 } 4482 4483 4484 static nxt_unit_process_t * 4485 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) 4486 { 4487 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); 4488 } 4489 4490 4491 int 4492 nxt_unit_run(nxt_unit_ctx_t *ctx) 4493 { 4494 int rc; 4495 nxt_unit_ctx_impl_t *ctx_impl; 4496 4497 nxt_unit_ctx_use(ctx); 4498 4499 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4500 4501 rc = NXT_UNIT_OK; 4502 4503 while (nxt_fast_path(ctx_impl->online)) { 4504 rc = nxt_unit_run_once_impl(ctx); 4505 4506 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4507 nxt_unit_quit(ctx); 4508 break; 4509 } 4510 } 4511 4512 nxt_unit_ctx_release(ctx); 4513 4514 return rc; 4515 } 4516 4517 4518 int 4519 nxt_unit_run_once(nxt_unit_ctx_t *ctx) 4520 { 4521 int rc; 4522 4523 nxt_unit_ctx_use(ctx); 4524 4525 rc = nxt_unit_run_once_impl(ctx); 4526 4527 nxt_unit_ctx_release(ctx); 4528 4529 return rc; 4530 } 4531 4532 4533 static int 4534 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) 4535 { 4536 int rc; 4537 nxt_unit_read_buf_t *rbuf; 4538 4539 rbuf = nxt_unit_read_buf_get(ctx); 4540 if (nxt_slow_path(rbuf == NULL)) { 4541 return NXT_UNIT_ERROR; 4542 } 4543 4544 rc = nxt_unit_read_buf(ctx, rbuf); 4545 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4546 nxt_unit_read_buf_release(ctx, rbuf); 4547 4548 return rc; 4549 } 4550 4551 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 4552 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4553 return NXT_UNIT_ERROR; 4554 } 4555 4556 rc = nxt_unit_process_pending_rbuf(ctx); 4557 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4558 return NXT_UNIT_ERROR; 4559 } 4560 4561 nxt_unit_process_ready_req(ctx); 4562 4563 return rc; 4564 } 4565 4566 4567 static int 4568 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) 4569 { 4570 int nevents, res, err; 4571 nxt_unit_impl_t *lib; 4572 nxt_unit_ctx_impl_t *ctx_impl; 4573 nxt_unit_port_impl_t *port_impl; 4574 struct pollfd fds[2]; 4575 4576 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4577 4578 if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) { 4579 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4580 } 4581 4582 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, 4583 port); 4584 4585 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4586 4587 retry: 4588 4589 if (port_impl->from_socket == 0) { 4590 res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf); 4591 if (res == NXT_UNIT_OK) { 4592 if (nxt_unit_is_read_socket(rbuf)) { 4593 port_impl->from_socket++; 4594 4595 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", 4596 (int) ctx_impl->read_port->id.pid, 4597 (int) ctx_impl->read_port->id.id, 4598 port_impl->from_socket); 4599 4600 } else { 4601 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", 4602 (int) ctx_impl->read_port->id.pid, 4603 (int) ctx_impl->read_port->id.id, 4604 (int) rbuf->size); 4605 4606 return NXT_UNIT_OK; 4607 } 4608 } 4609 } 4610 4611 res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); 4612 if (res == NXT_UNIT_OK) { 4613 return NXT_UNIT_OK; 4614 } 4615 4616 fds[0].fd = ctx_impl->read_port->in_fd; 4617 fds[0].events = POLLIN; 4618 fds[0].revents = 0; 4619 4620 fds[1].fd = lib->shared_port->in_fd; 4621 fds[1].events = POLLIN; 4622 fds[1].revents = 0; 4623 4624 nevents = poll(fds, 2, -1); 4625 if (nxt_slow_path(nevents == -1)) { 4626 err = errno; 4627 4628 if (err == EINTR) { 4629 goto retry; 4630 } 4631 4632 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)", 4633 fds[0].fd, fds[1].fd, strerror(err), err); 4634 4635 rbuf->size = -1; 4636 4637 return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; 4638 } 4639 4640 nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", 4641 fds[0].fd, fds[1].fd, nevents, fds[0].revents, 4642 fds[1].revents); 4643 4644 if ((fds[0].revents & POLLIN) != 0) { 4645 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4646 if (res == NXT_UNIT_AGAIN) { 4647 goto retry; 4648 } 4649 4650 return res; 4651 } 4652 4653 if ((fds[1].revents & POLLIN) != 0) { 4654 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); 4655 if (res == NXT_UNIT_AGAIN) { 4656 goto retry; 4657 } 4658 4659 return res; 4660 } 4661 4662 nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]", 4663 fds[0].fd, fds[1].fd, nevents, fds[0].revents, 4664 fds[1].revents); 4665 4666 return NXT_UNIT_ERROR; 4667 } 4668 4669 4670 static int 4671 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) 4672 { 4673 int rc; 4674 nxt_queue_t pending_rbuf; 4675 nxt_unit_ctx_impl_t *ctx_impl; 4676 nxt_unit_read_buf_t *rbuf; 4677 4678 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4679 4680 pthread_mutex_lock(&ctx_impl->mutex); 4681 4682 if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { 4683 pthread_mutex_unlock(&ctx_impl->mutex); 4684 4685 return NXT_UNIT_OK; 4686 } 4687 4688 nxt_queue_init(&pending_rbuf); 4689 4690 nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf); 4691 nxt_queue_init(&ctx_impl->pending_rbuf); 4692 4693 pthread_mutex_unlock(&ctx_impl->mutex); 4694 4695 rc = NXT_UNIT_OK; 4696 4697 nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { 4698 4699 if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { 4700 rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL); 4701 4702 } else { 4703 nxt_unit_read_buf_release(ctx, rbuf); 4704 } 4705 4706 } nxt_queue_loop; 4707 4708 return rc; 4709 } 4710 4711 4712 static void 4713 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) 4714 { 4715 int res; 4716 nxt_queue_t ready_req; 4717 nxt_unit_impl_t *lib; 4718 nxt_unit_ctx_impl_t *ctx_impl; 4719 nxt_unit_request_info_t *req; 4720 nxt_unit_request_info_impl_t *req_impl; 4721 4722 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4723 4724 pthread_mutex_lock(&ctx_impl->mutex); 4725 4726 if (nxt_queue_is_empty(&ctx_impl->ready_req)) { 4727 pthread_mutex_unlock(&ctx_impl->mutex); 4728 4729 return; 4730 } 4731 4732 nxt_queue_init(&ready_req); 4733 4734 nxt_queue_add(&ready_req, &ctx_impl->ready_req); 4735 nxt_queue_init(&ctx_impl->ready_req); 4736 4737 pthread_mutex_unlock(&ctx_impl->mutex); 4738 4739 nxt_queue_each(req_impl, &ready_req, 4740 nxt_unit_request_info_impl_t, port_wait_link) 4741 { 4742 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 4743 4744 req = &req_impl->req; 4745 4746 res = nxt_unit_send_req_headers_ack(req); 4747 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4748 nxt_unit_request_done(req, NXT_UNIT_ERROR); 4749 4750 continue; 4751 } 4752 4753 if (req->content_length 4754 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 4755 { 4756 res = nxt_unit_request_hash_add(ctx, req); 4757 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4758 nxt_unit_req_warn(req, "failed to add request to hash"); 4759 4760 nxt_unit_request_done(req, NXT_UNIT_ERROR); 4761 4762 continue; 4763 } 4764 4765 /* 4766 * If application have separate data handler, we may start 4767 * request processing and process data when it is arrived. 4768 */ 4769 if (lib->callbacks.data_handler == NULL) { 4770 continue; 4771 } 4772 } 4773 4774 lib->callbacks.request_handler(&req_impl->req); 4775 4776 } nxt_queue_loop; 4777 } 4778 4779 4780 int 4781 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) 4782 { 4783 int rc; 4784 nxt_unit_read_buf_t *rbuf; 4785 nxt_unit_ctx_impl_t *ctx_impl; 4786 4787 nxt_unit_ctx_use(ctx); 4788 4789 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4790 4791 rc = NXT_UNIT_OK; 4792 4793 while (nxt_fast_path(ctx_impl->online)) { 4794 rbuf = nxt_unit_read_buf_get(ctx); 4795 if (nxt_slow_path(rbuf == NULL)) { 4796 rc = NXT_UNIT_ERROR; 4797 break; 4798 } 4799 4800 retry: 4801 4802 rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4803 if (rc == NXT_UNIT_AGAIN) { 4804 goto retry; 4805 } 4806 4807 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 4808 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4809 break; 4810 } 4811 4812 rc = nxt_unit_process_pending_rbuf(ctx); 4813 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4814 break; 4815 } 4816 4817 nxt_unit_process_ready_req(ctx); 4818 } 4819 4820 nxt_unit_ctx_release(ctx); 4821 4822 return rc; 4823 } 4824 4825 4826 nxt_inline int 4827 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf) 4828 { 4829 nxt_port_msg_t *port_msg; 4830 4831 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4832 port_msg = (nxt_port_msg_t *) rbuf->buf; 4833 4834 return port_msg->type == _NXT_PORT_MSG_READ_QUEUE; 4835 } 4836 4837 return 0; 4838 } 4839 4840 4841 nxt_inline int 4842 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf) 4843 { 4844 if (nxt_fast_path(rbuf->size == 1)) { 4845 return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET; 4846 } 4847 4848 return 0; 4849 } 4850 4851 4852 nxt_inline int 4853 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf) 4854 { 4855 nxt_port_msg_t *port_msg; 4856 4857 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4858 port_msg = (nxt_port_msg_t *) rbuf->buf; 4859 4860 return port_msg->type == _NXT_PORT_MSG_SHM_ACK; 4861 } 4862 4863 return 0; 4864 } 4865 4866 4867 nxt_inline int 4868 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf) 4869 { 4870 nxt_port_msg_t *port_msg; 4871 4872 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4873 port_msg = (nxt_port_msg_t *) rbuf->buf; 4874 4875 return port_msg->type == _NXT_PORT_MSG_QUIT; 4876 } 4877 4878 return 0; 4879 } 4880 4881 4882 int 4883 nxt_unit_run_shared(nxt_unit_ctx_t *ctx) 4884 { 4885 int rc; 4886 nxt_unit_impl_t *lib; 4887 nxt_unit_read_buf_t *rbuf; 4888 nxt_unit_ctx_impl_t *ctx_impl; 4889 4890 nxt_unit_ctx_use(ctx); 4891 4892 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4893 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4894 4895 rc = NXT_UNIT_OK; 4896 4897 while (nxt_fast_path(ctx_impl->online)) { 4898 rbuf = nxt_unit_read_buf_get(ctx); 4899 if (nxt_slow_path(rbuf == NULL)) { 4900 rc = NXT_UNIT_ERROR; 4901 break; 4902 } 4903 4904 retry: 4905 4906 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); 4907 if (rc == NXT_UNIT_AGAIN) { 4908 goto retry; 4909 } 4910 4911 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4912 nxt_unit_read_buf_release(ctx, rbuf); 4913 break; 4914 } 4915 4916 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 4917 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4918 break; 4919 } 4920 } 4921 4922 nxt_unit_ctx_release(ctx); 4923 4924 return rc; 4925 } 4926 4927 4928 nxt_unit_request_info_t * 4929 nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) 4930 { 4931 int rc; 4932 nxt_unit_impl_t *lib; 4933 nxt_unit_read_buf_t *rbuf; 4934 nxt_unit_ctx_impl_t *ctx_impl; 4935 nxt_unit_request_info_t *req; 4936 4937 nxt_unit_ctx_use(ctx); 4938 4939 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4940 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4941 4942 req = NULL; 4943 4944 if (nxt_slow_path(!ctx_impl->online)) { 4945 goto done; 4946 } 4947 4948 rbuf = nxt_unit_read_buf_get(ctx); 4949 if (nxt_slow_path(rbuf == NULL)) { 4950 goto done; 4951 } 4952 4953 rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf); 4954 if (rc != NXT_UNIT_OK) { 4955 goto done; 4956 } 4957 4958 (void) nxt_unit_process_msg(ctx, rbuf, &req); 4959 4960 done: 4961 4962 nxt_unit_ctx_release(ctx); 4963 4964 return req; 4965 } 4966 4967 4968 int 4969 nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) 4970 { 4971 nxt_unit_impl_t *lib; 4972 4973 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4974 4975 return (ctx == &lib->main_ctx.ctx); 4976 } 4977 4978 4979 int 4980 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 4981 { 4982 int rc; 4983 4984 nxt_unit_ctx_use(ctx); 4985 4986 rc = nxt_unit_process_port_msg_impl(ctx, port); 4987 4988 nxt_unit_ctx_release(ctx); 4989 4990 return rc; 4991 } 4992 4993 4994 static int 4995 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 4996 { 4997 int rc; 4998 nxt_unit_impl_t *lib; 4999 nxt_unit_read_buf_t *rbuf; 5000 nxt_unit_ctx_impl_t *ctx_impl; 5001 5002 rbuf = nxt_unit_read_buf_get(ctx); 5003 if (nxt_slow_path(rbuf == NULL)) { 5004 return NXT_UNIT_ERROR; 5005 } 5006 5007 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5008 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5009 5010 retry: 5011 5012 if (port == lib->shared_port) { 5013 rc = nxt_unit_shared_port_recv(ctx, port, rbuf); 5014 5015 } else { 5016 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf); 5017 } 5018 5019 if (rc != NXT_UNIT_OK) { 5020 nxt_unit_read_buf_release(ctx, rbuf); 5021 return rc; 5022 } 5023 5024 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 5025 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 5026 return NXT_UNIT_ERROR; 5027 } 5028 5029 rc = nxt_unit_process_pending_rbuf(ctx); 5030 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 5031 return NXT_UNIT_ERROR; 5032 } 5033 5034 nxt_unit_process_ready_req(ctx); 5035 5036 rbuf = nxt_unit_read_buf_get(ctx); 5037 if (nxt_slow_path(rbuf == NULL)) { 5038 return NXT_UNIT_ERROR; 5039 } 5040 5041 if (ctx_impl->online) { 5042 goto retry; 5043 } 5044 5045 return rc; 5046 } 5047 5048 5049 void 5050 nxt_unit_done(nxt_unit_ctx_t *ctx) 5051 { 5052 nxt_unit_ctx_release(ctx); 5053 } 5054 5055 5056 nxt_unit_ctx_t * 5057 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) 5058 { 5059 int rc, queue_fd; 5060 void *mem; 5061 nxt_unit_impl_t *lib; 5062 nxt_unit_port_t *port; 5063 nxt_unit_ctx_impl_t *new_ctx; 5064 nxt_unit_port_impl_t *port_impl; 5065 5066 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5067 5068 new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t) 5069 + lib->request_data_size); 5070 if (nxt_slow_path(new_ctx == NULL)) { 5071 nxt_unit_alert(ctx, "failed to allocate context"); 5072 5073 return NULL; 5074 } 5075 5076 rc = nxt_unit_ctx_init(lib, new_ctx, data); 5077 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5078 nxt_unit_free(ctx, new_ctx); 5079 5080 return NULL; 5081 } 5082 5083 queue_fd = -1; 5084 5085 port = nxt_unit_create_port(&new_ctx->ctx); 5086 if (nxt_slow_path(port == NULL)) { 5087 goto fail; 5088 } 5089 5090 new_ctx->read_port = port; 5091 5092 queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t)); 5093 if (nxt_slow_path(queue_fd == -1)) { 5094 goto fail; 5095 } 5096 5097 mem = mmap(NULL, sizeof(nxt_port_queue_t), 5098 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 5099 if (nxt_slow_path(mem == MAP_FAILED)) { 5100 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 5101 strerror(errno), errno); 5102 5103 goto fail; 5104 } 5105 5106 nxt_port_queue_init(mem); 5107 5108 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5109 port_impl->queue = mem; 5110 5111 rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd); 5112 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5113 goto fail; 5114 } 5115 5116 nxt_unit_close(queue_fd); 5117 5118 return &new_ctx->ctx; 5119 5120 fail: 5121 5122 if (queue_fd != -1) { 5123 nxt_unit_close(queue_fd); 5124 } 5125 5126 nxt_unit_ctx_release(&new_ctx->ctx); 5127 5128 return NULL; 5129 } 5130 5131 5132 static void 5133 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) 5134 { 5135 nxt_unit_impl_t *lib; 5136 nxt_unit_mmap_buf_t *mmap_buf; 5137 nxt_unit_read_buf_t *rbuf; 5138 nxt_unit_request_info_impl_t *req_impl; 5139 nxt_unit_websocket_frame_impl_t *ws_impl; 5140 5141 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 5142 5143 nxt_queue_each(req_impl, &ctx_impl->active_req, 5144 nxt_unit_request_info_impl_t, link) 5145 { 5146 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 5147 5148 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 5149 5150 } nxt_queue_loop; 5151 5152 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]); 5153 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]); 5154 5155 while (ctx_impl->free_buf != NULL) { 5156 mmap_buf = ctx_impl->free_buf; 5157 nxt_unit_mmap_buf_unlink(mmap_buf); 5158 nxt_unit_free(&ctx_impl->ctx, mmap_buf); 5159 } 5160 5161 nxt_queue_each(req_impl, &ctx_impl->free_req, 5162 nxt_unit_request_info_impl_t, link) 5163 { 5164 nxt_unit_request_info_free(req_impl); 5165 5166 } nxt_queue_loop; 5167 5168 nxt_queue_each(ws_impl, &ctx_impl->free_ws, 5169 nxt_unit_websocket_frame_impl_t, link) 5170 { 5171 nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl); 5172 5173 } nxt_queue_loop; 5174 5175 nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link) 5176 { 5177 if (rbuf != &ctx_impl->ctx_read_buf) { 5178 nxt_unit_free(&ctx_impl->ctx, rbuf); 5179 } 5180 } nxt_queue_loop; 5181 5182 pthread_mutex_destroy(&ctx_impl->mutex); 5183 5184 pthread_mutex_lock(&lib->mutex); 5185 5186 nxt_queue_remove(&ctx_impl->link); 5187 5188 pthread_mutex_unlock(&lib->mutex); 5189 5190 if (nxt_fast_path(ctx_impl->read_port != NULL)) { 5191 nxt_unit_remove_port(lib, &ctx_impl->read_port->id); 5192 nxt_unit_port_release(ctx_impl->read_port); 5193 } 5194 5195 if (ctx_impl != &lib->main_ctx) { 5196 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl); 5197 } 5198 5199 nxt_unit_lib_release(lib); 5200 } 5201 5202 5203 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */ 5204 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET) 5205 #define NXT_UNIX_SOCKET SOCK_SEQPACKET 5206 #else 5207 #define NXT_UNIX_SOCKET SOCK_DGRAM 5208 #endif 5209 5210 5211 void 5212 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) 5213 { 5214 nxt_unit_port_hash_id_t port_hash_id; 5215 5216 port_hash_id.pid = pid; 5217 port_hash_id.id = id; 5218 5219 port_id->pid = pid; 5220 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id)); 5221 port_id->id = id; 5222 } 5223 5224 5225 static nxt_unit_port_t * 5226 nxt_unit_create_port(nxt_unit_ctx_t *ctx) 5227 { 5228 int rc, port_sockets[2]; 5229 nxt_unit_impl_t *lib; 5230 nxt_unit_port_t new_port, *port; 5231 nxt_unit_process_t *process; 5232 5233 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5234 5235 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets); 5236 if (nxt_slow_path(rc != 0)) { 5237 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", 5238 strerror(errno), errno); 5239 5240 return NULL; 5241 } 5242 5243 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", 5244 port_sockets[0], port_sockets[1]); 5245 5246 pthread_mutex_lock(&lib->mutex); 5247 5248 process = nxt_unit_process_get(ctx, lib->pid); 5249 if (nxt_slow_path(process == NULL)) { 5250 pthread_mutex_unlock(&lib->mutex); 5251 5252 nxt_unit_close(port_sockets[0]); 5253 nxt_unit_close(port_sockets[1]); 5254 5255 return NULL; 5256 } 5257 5258 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); 5259 5260 new_port.in_fd = port_sockets[0]; 5261 new_port.out_fd = port_sockets[1]; 5262 new_port.data = NULL; 5263 5264 pthread_mutex_unlock(&lib->mutex); 5265 5266 nxt_unit_process_release(process); 5267 5268 port = nxt_unit_add_port(ctx, &new_port, NULL); 5269 if (nxt_slow_path(port == NULL)) { 5270 nxt_unit_close(port_sockets[0]); 5271 nxt_unit_close(port_sockets[1]); 5272 } 5273 5274 return port; 5275 } 5276 5277 5278 static int 5279 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 5280 nxt_unit_port_t *port, int queue_fd) 5281 { 5282 ssize_t res; 5283 nxt_unit_impl_t *lib; 5284 int fds[2] = { port->out_fd, queue_fd }; 5285 5286 struct { 5287 nxt_port_msg_t msg; 5288 nxt_port_msg_new_port_t new_port; 5289 } m; 5290 5291 union { 5292 struct cmsghdr cm; 5293 char space[CMSG_SPACE(sizeof(int) * 2)]; 5294 } cmsg; 5295 5296 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5297 5298 m.msg.stream = 0; 5299 m.msg.pid = lib->pid; 5300 m.msg.reply_port = 0; 5301 m.msg.type = _NXT_PORT_MSG_NEW_PORT; 5302 m.msg.last = 0; 5303 m.msg.mmap = 0; 5304 m.msg.nf = 0; 5305 m.msg.mf = 0; 5306 m.msg.tracking = 0; 5307 5308 m.new_port.id = port->id.id; 5309 m.new_port.pid = port->id.pid; 5310 m.new_port.type = NXT_PROCESS_APP; 5311 m.new_port.max_size = 16 * 1024; 5312 m.new_port.max_share = 64 * 1024; 5313 5314 memset(&cmsg, 0, sizeof(cmsg)); 5315 5316 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); 5317 cmsg.cm.cmsg_level = SOL_SOCKET; 5318 cmsg.cm.cmsg_type = SCM_RIGHTS; 5319 5320 /* 5321 * memcpy() is used instead of simple 5322 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 5323 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 5324 * dereferencing type-punned pointer will break strict-aliasing rules 5325 * 5326 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 5327 * in the same simple assignment as in the code above. 5328 */ 5329 memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); 5330 5331 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); 5332 5333 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; 5334 } 5335 5336 5337 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port) 5338 { 5339 nxt_unit_port_impl_t *port_impl; 5340 5341 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5342 5343 nxt_atomic_fetch_add(&port_impl->use_count, 1); 5344 } 5345 5346 5347 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) 5348 { 5349 long c; 5350 nxt_unit_port_impl_t *port_impl; 5351 5352 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5353 5354 c = nxt_atomic_fetch_add(&port_impl->use_count, -1); 5355 5356 if (c == 1) { 5357 nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d", 5358 (int) port->id.pid, (int) port->id.id, 5359 port->in_fd, port->out_fd); 5360 5361 nxt_unit_process_release(port_impl->process); 5362 5363 if (port->in_fd != -1) { 5364 nxt_unit_close(port->in_fd); 5365 5366 port->in_fd = -1; 5367 } 5368 5369 if (port->out_fd != -1) { 5370 nxt_unit_close(port->out_fd); 5371 5372 port->out_fd = -1; 5373 } 5374 5375 if (port_impl->queue != NULL) { 5376 munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID) 5377 ? sizeof(nxt_app_queue_t) 5378 : sizeof(nxt_port_queue_t)); 5379 } 5380 5381 nxt_unit_free(NULL, port_impl); 5382 } 5383 } 5384 5385 5386 static nxt_unit_port_t * 5387 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) 5388 { 5389 int rc, ready; 5390 nxt_queue_t awaiting_req; 5391 nxt_unit_impl_t *lib; 5392 nxt_unit_port_t *old_port; 5393 nxt_unit_process_t *process; 5394 nxt_unit_port_impl_t *new_port, *old_port_impl; 5395 5396 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5397 5398 pthread_mutex_lock(&lib->mutex); 5399 5400 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); 5401 5402 if (nxt_slow_path(old_port != NULL)) { 5403 nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} " 5404 "in_fd %d out_fd %d queue %p", 5405 port->id.pid, port->id.id, 5406 port->in_fd, port->out_fd, queue); 5407 5408 if (old_port->data == NULL) { 5409 old_port->data = port->data; 5410 port->data = NULL; 5411 } 5412 5413 if (old_port->in_fd == -1) { 5414 old_port->in_fd = port->in_fd; 5415 port->in_fd = -1; 5416 } 5417 5418 if (port->in_fd != -1) { 5419 nxt_unit_close(port->in_fd); 5420 port->in_fd = -1; 5421 } 5422 5423 if (old_port->out_fd == -1) { 5424 old_port->out_fd = port->out_fd; 5425 port->out_fd = -1; 5426 } 5427 5428 if (port->out_fd != -1) { 5429 nxt_unit_close(port->out_fd); 5430 port->out_fd = -1; 5431 } 5432 5433 *port = *old_port; 5434 5435 nxt_queue_init(&awaiting_req); 5436 5437 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); 5438 5439 if (old_port_impl->queue == NULL) { 5440 old_port_impl->queue = queue; 5441 } 5442 5443 ready = (port->in_fd != -1 || port->out_fd != -1); 5444 5445 /* 5446 * Port can be market as 'ready' only after callbacks.add_port() call. 5447 * Otherwise, request may try to use the port before callback. 5448 */ 5449 if (lib->callbacks.add_port == NULL && ready) { 5450 old_port_impl->ready = ready; 5451 5452 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { 5453 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); 5454 nxt_queue_init(&old_port_impl->awaiting_req); 5455 } 5456 } 5457 5458 pthread_mutex_unlock(&lib->mutex); 5459 5460 if (lib->callbacks.add_port != NULL && ready) { 5461 lib->callbacks.add_port(ctx, old_port); 5462 5463 pthread_mutex_lock(&lib->mutex); 5464 5465 old_port_impl->ready = ready; 5466 5467 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { 5468 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); 5469 nxt_queue_init(&old_port_impl->awaiting_req); 5470 } 5471 5472 pthread_mutex_unlock(&lib->mutex); 5473 } 5474 5475 nxt_unit_process_awaiting_req(ctx, &awaiting_req); 5476 5477 return old_port; 5478 } 5479 5480 new_port = NULL; 5481 ready = 0; 5482 5483 nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", 5484 port->id.pid, port->id.id, 5485 port->in_fd, port->out_fd, queue); 5486 5487 process = nxt_unit_process_get(ctx, port->id.pid); 5488 if (nxt_slow_path(process == NULL)) { 5489 goto unlock; 5490 } 5491 5492 if (port->id.id != NXT_UNIT_SHARED_PORT_ID 5493 && port->id.id >= process->next_port_id) 5494 { 5495 process->next_port_id = port->id.id + 1; 5496 } 5497 5498 new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 5499 if (nxt_slow_path(new_port == NULL)) { 5500 nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", 5501 port->id.pid, port->id.id); 5502 5503 goto unlock; 5504 } 5505 5506 new_port->port = *port; 5507 5508 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); 5509 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5510 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", 5511 port->id.pid, port->id.id); 5512 5513 nxt_unit_free(ctx, new_port); 5514 5515 new_port = NULL; 5516 5517 goto unlock; 5518 } 5519 5520 nxt_queue_insert_tail(&process->ports, &new_port->link); 5521 5522 new_port->use_count = 2; 5523 new_port->process = process; 5524 new_port->queue = queue; 5525 new_port->from_socket = 0; 5526 new_port->socket_rbuf = NULL; 5527 5528 nxt_queue_init(&new_port->awaiting_req); 5529 5530 ready = (port->in_fd != -1 || port->out_fd != -1); 5531 5532 if (lib->callbacks.add_port == NULL) { 5533 new_port->ready = ready; 5534 5535 } else { 5536 new_port->ready = 0; 5537 } 5538 5539 process = NULL; 5540 5541 unlock: 5542 5543 pthread_mutex_unlock(&lib->mutex); 5544 5545 if (nxt_slow_path(process != NULL)) { 5546 nxt_unit_process_release(process); 5547 } 5548 5549 if (lib->callbacks.add_port != NULL && new_port != NULL && ready) { 5550 lib->callbacks.add_port(ctx, &new_port->port); 5551 5552 nxt_queue_init(&awaiting_req); 5553 5554 pthread_mutex_lock(&lib->mutex); 5555 5556 new_port->ready = 1; 5557 5558 if (!nxt_queue_is_empty(&new_port->awaiting_req)) { 5559 nxt_queue_add(&awaiting_req, &new_port->awaiting_req); 5560 nxt_queue_init(&new_port->awaiting_req); 5561 } 5562 5563 pthread_mutex_unlock(&lib->mutex); 5564 5565 nxt_unit_process_awaiting_req(ctx, &awaiting_req); 5566 } 5567 5568 return (new_port == NULL) ? NULL : &new_port->port; 5569 } 5570 5571 5572 static void 5573 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req) 5574 { 5575 nxt_unit_ctx_impl_t *ctx_impl; 5576 nxt_unit_request_info_impl_t *req_impl; 5577 5578 nxt_queue_each(req_impl, awaiting_req, 5579 nxt_unit_request_info_impl_t, port_wait_link) 5580 { 5581 nxt_queue_remove(&req_impl->port_wait_link); 5582 5583 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, 5584 ctx); 5585 5586 pthread_mutex_lock(&ctx_impl->mutex); 5587 5588 nxt_queue_insert_tail(&ctx_impl->ready_req, 5589 &req_impl->port_wait_link); 5590 5591 pthread_mutex_unlock(&ctx_impl->mutex); 5592 5593 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); 5594 5595 nxt_unit_awake_ctx(ctx, ctx_impl); 5596 5597 } nxt_queue_loop; 5598 } 5599 5600 5601 static void 5602 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 5603 { 5604 nxt_unit_port_t *port; 5605 nxt_unit_port_impl_t *port_impl; 5606 5607 pthread_mutex_lock(&lib->mutex); 5608 5609 port = nxt_unit_remove_port_unsafe(lib, port_id); 5610 5611 if (nxt_fast_path(port != NULL)) { 5612 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5613 5614 nxt_queue_remove(&port_impl->link); 5615 } 5616 5617 pthread_mutex_unlock(&lib->mutex); 5618 5619 if (lib->callbacks.remove_port != NULL && port != NULL) { 5620 lib->callbacks.remove_port(&lib->unit, port); 5621 } 5622 5623 if (nxt_fast_path(port != NULL)) { 5624 nxt_unit_port_release(port); 5625 } 5626 } 5627 5628 5629 static nxt_unit_port_t * 5630 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 5631 { 5632 nxt_unit_port_t *port; 5633 5634 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); 5635 if (nxt_slow_path(port == NULL)) { 5636 nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found", 5637 (int) port_id->pid, (int) port_id->id); 5638 5639 return NULL; 5640 } 5641 5642 nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p", 5643 (int) port_id->pid, (int) port_id->id, 5644 port->in_fd, port->out_fd, port->data); 5645 5646 return port; 5647 } 5648 5649 5650 static void 5651 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) 5652 { 5653 nxt_unit_process_t *process; 5654 5655 pthread_mutex_lock(&lib->mutex); 5656 5657 process = nxt_unit_process_find(lib, pid, 1); 5658 if (nxt_slow_path(process == NULL)) { 5659 nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid); 5660 5661 pthread_mutex_unlock(&lib->mutex); 5662 5663 return; 5664 } 5665 5666 nxt_unit_remove_process(lib, process); 5667 5668 if (lib->callbacks.remove_pid != NULL) { 5669 lib->callbacks.remove_pid(&lib->unit, pid); 5670 } 5671 } 5672 5673 5674 static void 5675 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) 5676 { 5677 nxt_queue_t ports; 5678 nxt_unit_port_impl_t *port; 5679 5680 nxt_queue_init(&ports); 5681 5682 nxt_queue_add(&ports, &process->ports); 5683 5684 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 5685 5686 nxt_unit_remove_port_unsafe(lib, &port->port.id); 5687 5688 } nxt_queue_loop; 5689 5690 pthread_mutex_unlock(&lib->mutex); 5691 5692 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 5693 5694 nxt_queue_remove(&port->link); 5695 5696 if (lib->callbacks.remove_port != NULL) { 5697 lib->callbacks.remove_port(&lib->unit, &port->port); 5698 } 5699 5700 nxt_unit_port_release(&port->port); 5701 5702 } nxt_queue_loop; 5703 5704 nxt_unit_process_release(process); 5705 } 5706 5707 5708 static void 5709 nxt_unit_quit(nxt_unit_ctx_t *ctx) 5710 { 5711 nxt_port_msg_t msg; 5712 nxt_unit_impl_t *lib; 5713 nxt_unit_ctx_impl_t *ctx_impl; 5714 5715 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5716 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5717 5718 if (!ctx_impl->online) { 5719 return; 5720 } 5721 5722 ctx_impl->online = 0; 5723 5724 if (lib->callbacks.quit != NULL) { 5725 lib->callbacks.quit(ctx); 5726 } 5727 5728 if (ctx != &lib->main_ctx.ctx) { 5729 return; 5730 } 5731 5732 memset(&msg, 0, sizeof(nxt_port_msg_t)); 5733 5734 msg.pid = lib->pid; 5735 msg.type = _NXT_PORT_MSG_QUIT; 5736 5737 pthread_mutex_lock(&lib->mutex); 5738 5739 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { 5740 5741 if (ctx == &ctx_impl->ctx 5742 || ctx_impl->read_port == NULL 5743 || ctx_impl->read_port->out_fd == -1) 5744 { 5745 continue; 5746 } 5747 5748 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, 5749 &msg, sizeof(msg), NULL, 0); 5750 5751 } nxt_queue_loop; 5752 5753 pthread_mutex_unlock(&lib->mutex); 5754 } 5755 5756 5757 static int 5758 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 5759 { 5760 ssize_t res; 5761 nxt_unit_impl_t *lib; 5762 nxt_unit_ctx_impl_t *ctx_impl; 5763 5764 struct { 5765 nxt_port_msg_t msg; 5766 nxt_port_msg_get_port_t get_port; 5767 } m; 5768 5769 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5770 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5771 5772 memset(&m.msg, 0, sizeof(nxt_port_msg_t)); 5773 5774 m.msg.pid = lib->pid; 5775 m.msg.reply_port = ctx_impl->read_port->id.id; 5776 m.msg.type = _NXT_PORT_MSG_GET_PORT; 5777 5778 m.get_port.id = port_id->id; 5779 m.get_port.pid = port_id->pid; 5780 5781 nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, 5782 (int) port_id->id); 5783 5784 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); 5785 if (nxt_slow_path(res != sizeof(m))) { 5786 return NXT_UNIT_ERROR; 5787 } 5788 5789 return NXT_UNIT_OK; 5790 } 5791 5792 5793 static ssize_t 5794 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5795 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 5796 { 5797 int notify; 5798 ssize_t ret; 5799 nxt_int_t rc; 5800 nxt_port_msg_t msg; 5801 nxt_unit_impl_t *lib; 5802 nxt_unit_port_impl_t *port_impl; 5803 5804 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5805 5806 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5807 if (port_impl->queue != NULL && oob_size == 0 5808 && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) 5809 { 5810 rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); 5811 if (nxt_slow_path(rc != NXT_OK)) { 5812 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", 5813 (int) port->id.pid, (int) port->id.id); 5814 5815 return -1; 5816 } 5817 5818 nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d", 5819 (int) port->id.pid, (int) port->id.id, 5820 (int) buf_size, notify); 5821 5822 if (notify) { 5823 memcpy(&msg, buf, sizeof(nxt_port_msg_t)); 5824 5825 msg.type = _NXT_PORT_MSG_READ_QUEUE; 5826 5827 if (lib->callbacks.port_send == NULL) { 5828 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, 5829 sizeof(nxt_port_msg_t), NULL, 0); 5830 5831 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", 5832 (int) port->id.pid, (int) port->id.id, 5833 (int) ret); 5834 5835 } else { 5836 ret = lib->callbacks.port_send(ctx, port, &msg, 5837 sizeof(nxt_port_msg_t), NULL, 0); 5838 5839 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue", 5840 (int) port->id.pid, (int) port->id.id, 5841 (int) ret); 5842 } 5843 5844 } 5845 5846 return buf_size; 5847 } 5848 5849 if (port_impl->queue != NULL) { 5850 msg.type = _NXT_PORT_MSG_READ_SOCKET; 5851 5852 rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify); 5853 if (nxt_slow_path(rc != NXT_OK)) { 5854 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", 5855 (int) port->id.pid, (int) port->id.id); 5856 5857 return -1; 5858 } 5859 5860 nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d", 5861 (int) port->id.pid, (int) port->id.id, notify); 5862 } 5863 5864 if (lib->callbacks.port_send != NULL) { 5865 ret = lib->callbacks.port_send(ctx, port, buf, buf_size, 5866 oob, oob_size); 5867 5868 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", 5869 (int) port->id.pid, (int) port->id.id, 5870 (int) ret); 5871 5872 } else { 5873 ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, 5874 oob, oob_size); 5875 5876 nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", 5877 (int) port->id.pid, (int) port->id.id, 5878 (int) ret); 5879 } 5880 5881 return ret; 5882 } 5883 5884 5885 static ssize_t 5886 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 5887 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 5888 { 5889 int err; 5890 ssize_t res; 5891 struct iovec iov[1]; 5892 struct msghdr msg; 5893 5894 iov[0].iov_base = (void *) buf; 5895 iov[0].iov_len = buf_size; 5896 5897 msg.msg_name = NULL; 5898 msg.msg_namelen = 0; 5899 msg.msg_iov = iov; 5900 msg.msg_iovlen = 1; 5901 msg.msg_flags = 0; 5902 msg.msg_control = (void *) oob; 5903 msg.msg_controllen = oob_size; 5904 5905 retry: 5906 5907 res = sendmsg(fd, &msg, 0); 5908 5909 if (nxt_slow_path(res == -1)) { 5910 err = errno; 5911 5912 if (err == EINTR) { 5913 goto retry; 5914 } 5915 5916 /* 5917 * FIXME: This should be "alert" after router graceful shutdown 5918 * implementation. 5919 */ 5920 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", 5921 fd, (int) buf_size, strerror(err), err); 5922 5923 } else { 5924 nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, 5925 (int) res); 5926 } 5927 5928 return res; 5929 } 5930 5931 5932 static int 5933 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5934 nxt_unit_read_buf_t *rbuf) 5935 { 5936 int res, read; 5937 nxt_unit_port_impl_t *port_impl; 5938 5939 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5940 5941 read = 0; 5942 5943 retry: 5944 5945 if (port_impl->from_socket > 0) { 5946 if (port_impl->socket_rbuf != NULL 5947 && port_impl->socket_rbuf->size > 0) 5948 { 5949 port_impl->from_socket--; 5950 5951 nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf); 5952 port_impl->socket_rbuf->size = 0; 5953 5954 nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d", 5955 (int) port->id.pid, (int) port->id.id, 5956 (int) rbuf->size); 5957 5958 return NXT_UNIT_OK; 5959 } 5960 5961 } else { 5962 res = nxt_unit_port_queue_recv(port, rbuf); 5963 5964 if (res == NXT_UNIT_OK) { 5965 if (nxt_unit_is_read_socket(rbuf)) { 5966 port_impl->from_socket++; 5967 5968 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", 5969 (int) port->id.pid, (int) port->id.id, 5970 port_impl->from_socket); 5971 5972 goto retry; 5973 } 5974 5975 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", 5976 (int) port->id.pid, (int) port->id.id, 5977 (int) rbuf->size); 5978 5979 return NXT_UNIT_OK; 5980 } 5981 } 5982 5983 if (read) { 5984 return NXT_UNIT_AGAIN; 5985 } 5986 5987 res = nxt_unit_port_recv(ctx, port, rbuf); 5988 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 5989 return NXT_UNIT_ERROR; 5990 } 5991 5992 read = 1; 5993 5994 if (nxt_unit_is_read_queue(rbuf)) { 5995 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", 5996 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 5997 5998 goto retry; 5999 } 6000 6001 nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d", 6002 (int) port->id.pid, (int) port->id.id, 6003 (int) rbuf->size); 6004 6005 if (res == NXT_UNIT_AGAIN) { 6006 return NXT_UNIT_AGAIN; 6007 } 6008 6009 if (port_impl->from_socket > 0) { 6010 port_impl->from_socket--; 6011 6012 return NXT_UNIT_OK; 6013 } 6014 6015 nxt_unit_debug(ctx, "port{%d,%d} suspend message %d", 6016 (int) port->id.pid, (int) port->id.id, 6017 (int) rbuf->size); 6018 6019 if (port_impl->socket_rbuf == NULL) { 6020 port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx); 6021 6022 if (nxt_slow_path(port_impl->socket_rbuf == NULL)) { 6023 return NXT_UNIT_ERROR; 6024 } 6025 6026 port_impl->socket_rbuf->size = 0; 6027 } 6028 6029 if (port_impl->socket_rbuf->size > 0) { 6030 nxt_unit_alert(ctx, "too many port socket messages"); 6031 6032 return NXT_UNIT_ERROR; 6033 } 6034 6035 nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); 6036 6037 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 6038 6039 goto retry; 6040 } 6041 6042 6043 nxt_inline void 6044 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) 6045 { 6046 memcpy(dst->buf, src->buf, src->size); 6047 dst->size = src->size; 6048 memcpy(dst->oob, src->oob, sizeof(src->oob)); 6049 } 6050 6051 6052 static int 6053 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 6054 nxt_unit_read_buf_t *rbuf) 6055 { 6056 int res; 6057 6058 retry: 6059 6060 res = nxt_unit_app_queue_recv(port, rbuf); 6061 6062 if (res == NXT_UNIT_AGAIN) { 6063 res = nxt_unit_port_recv(ctx, port, rbuf); 6064 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 6065 return NXT_UNIT_ERROR; 6066 } 6067 6068 if (nxt_unit_is_read_queue(rbuf)) { 6069 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", 6070 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 6071 6072 goto retry; 6073 } 6074 } 6075 6076 return res; 6077 } 6078 6079 6080 static int 6081 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 6082 nxt_unit_read_buf_t *rbuf) 6083 { 6084 int fd, err; 6085 struct iovec iov[1]; 6086 struct msghdr msg; 6087 nxt_unit_impl_t *lib; 6088 6089 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 6090 6091 if (lib->callbacks.port_recv != NULL) { 6092 rbuf->size = lib->callbacks.port_recv(ctx, port, 6093 rbuf->buf, sizeof(rbuf->buf), 6094 rbuf->oob, sizeof(rbuf->oob)); 6095 6096 nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", 6097 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 6098 6099 if (nxt_slow_path(rbuf->size < 0)) { 6100 return NXT_UNIT_ERROR; 6101 } 6102 6103 return NXT_UNIT_OK; 6104 } 6105 6106 iov[0].iov_base = rbuf->buf; 6107 iov[0].iov_len = sizeof(rbuf->buf); 6108 6109 msg.msg_name = NULL; 6110 msg.msg_namelen = 0; 6111 msg.msg_iov = iov; 6112 msg.msg_iovlen = 1; 6113 msg.msg_flags = 0; 6114 msg.msg_control = rbuf->oob; 6115 msg.msg_controllen = sizeof(rbuf->oob); 6116 6117 fd = port->in_fd; 6118 6119 retry: 6120 6121 rbuf->size = recvmsg(fd, &msg, 0); 6122 6123 if (nxt_slow_path(rbuf->size == -1)) { 6124 err = errno; 6125 6126 if (err == EINTR) { 6127 goto retry; 6128 } 6129 6130 if (err == EAGAIN) { 6131 nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", 6132 fd, strerror(err), err); 6133 6134 return NXT_UNIT_AGAIN; 6135 } 6136 6137 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", 6138 fd, strerror(err), err); 6139 6140 return NXT_UNIT_ERROR; 6141 } 6142 6143 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size); 6144 6145 return NXT_UNIT_OK; 6146 } 6147 6148 6149 static int 6150 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) 6151 { 6152 nxt_unit_port_impl_t *port_impl; 6153 6154 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 6155 6156 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf); 6157 6158 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; 6159 } 6160 6161 6162 static int 6163 nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) 6164 { 6165 uint32_t cookie; 6166 nxt_port_msg_t *port_msg; 6167 nxt_app_queue_t *queue; 6168 nxt_unit_port_impl_t *port_impl; 6169 6170 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 6171 queue = port_impl->queue; 6172 6173 retry: 6174 6175 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie); 6176 6177 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size); 6178 6179 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) { 6180 port_msg = (nxt_port_msg_t *) rbuf->buf; 6181 6182 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { 6183 return NXT_UNIT_OK; 6184 } 6185 6186 nxt_unit_debug(NULL, "app_queue_recv: message cancelled"); 6187 6188 goto retry; 6189 } 6190 6191 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; 6192 } 6193 6194 6195 nxt_inline int 6196 nxt_unit_close(int fd) 6197 { 6198 int res; 6199 6200 res = close(fd); 6201 6202 if (nxt_slow_path(res == -1)) { 6203 nxt_unit_alert(NULL, "close(%d) failed: %s (%d)", 6204 fd, strerror(errno), errno); 6205 6206 } else { 6207 nxt_unit_debug(NULL, "close(%d): %d", fd, res); 6208 } 6209 6210 return res; 6211 } 6212 6213 6214 static int 6215 nxt_unit_fd_blocking(int fd) 6216 { 6217 int nb; 6218 6219 nb = 0; 6220 6221 if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) { 6222 nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 6223 fd, strerror(errno), errno); 6224 6225 return NXT_UNIT_ERROR; 6226 } 6227 6228 return NXT_UNIT_OK; 6229 } 6230 6231 6232 static nxt_int_t 6233 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 6234 { 6235 nxt_unit_port_t *port; 6236 nxt_unit_port_hash_id_t *port_id; 6237 6238 port = data; 6239 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start; 6240 6241 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t) 6242 && port_id->pid == port->id.pid 6243 && port_id->id == port->id.id) 6244 { 6245 return NXT_OK; 6246 } 6247 6248 return NXT_DECLINED; 6249 } 6250 6251 6252 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { 6253 NXT_LVLHSH_DEFAULT, 6254 nxt_unit_port_hash_test, 6255 nxt_unit_lvlhsh_alloc, 6256 nxt_unit_lvlhsh_free, 6257 }; 6258 6259 6260 static inline void 6261 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq, 6262 nxt_unit_port_hash_id_t *port_hash_id, 6263 nxt_unit_port_id_t *port_id) 6264 { 6265 port_hash_id->pid = port_id->pid; 6266 port_hash_id->id = port_id->id; 6267 6268 if (nxt_fast_path(port_id->hash != 0)) { 6269 lhq->key_hash = port_id->hash; 6270 6271 } else { 6272 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id)); 6273 6274 port_id->hash = lhq->key_hash; 6275 6276 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X", 6277 (int) port_id->pid, (int) port_id->id, 6278 (int) port_id->hash); 6279 } 6280 6281 lhq->key.length = sizeof(nxt_unit_port_hash_id_t); 6282 lhq->key.start = (u_char *) port_hash_id; 6283 lhq->proto = &lvlhsh_ports_proto; 6284 lhq->pool = NULL; 6285 } 6286 6287 6288 static int 6289 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) 6290 { 6291 nxt_int_t res; 6292 nxt_lvlhsh_query_t lhq; 6293 nxt_unit_port_hash_id_t port_hash_id; 6294 6295 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id); 6296 lhq.replace = 0; 6297 lhq.value = port; 6298 6299 res = nxt_lvlhsh_insert(port_hash, &lhq); 6300 6301 switch (res) { 6302 6303 case NXT_OK: 6304 return NXT_UNIT_OK; 6305 6306 default: 6307 return NXT_UNIT_ERROR; 6308 } 6309 } 6310 6311 6312 static nxt_unit_port_t * 6313 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, 6314 int remove) 6315 { 6316 nxt_int_t res; 6317 nxt_lvlhsh_query_t lhq; 6318 nxt_unit_port_hash_id_t port_hash_id; 6319 6320 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id); 6321 6322 if (remove) { 6323 res = nxt_lvlhsh_delete(port_hash, &lhq); 6324 6325 } else { 6326 res = nxt_lvlhsh_find(port_hash, &lhq); 6327 } 6328 6329 switch (res) { 6330 6331 case NXT_OK: 6332 if (!remove) { 6333 nxt_unit_port_use(lhq.value); 6334 } 6335 6336 return lhq.value; 6337 6338 default: 6339 return NULL; 6340 } 6341 } 6342 6343 6344 static nxt_int_t 6345 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 6346 { 6347 return NXT_OK; 6348 } 6349 6350 6351 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { 6352 NXT_LVLHSH_DEFAULT, 6353 nxt_unit_request_hash_test, 6354 nxt_unit_lvlhsh_alloc, 6355 nxt_unit_lvlhsh_free, 6356 }; 6357 6358 6359 static int 6360 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 6361 nxt_unit_request_info_t *req) 6362 { 6363 uint32_t *stream; 6364 nxt_int_t res; 6365 nxt_lvlhsh_query_t lhq; 6366 nxt_unit_ctx_impl_t *ctx_impl; 6367 nxt_unit_request_info_impl_t *req_impl; 6368 6369 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 6370 if (req_impl->in_hash) { 6371 return NXT_UNIT_OK; 6372 } 6373 6374 stream = &req_impl->stream; 6375 6376 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 6377 lhq.key.length = sizeof(*stream); 6378 lhq.key.start = (u_char *) stream; 6379 lhq.proto = &lvlhsh_requests_proto; 6380 lhq.pool = NULL; 6381 lhq.replace = 0; 6382 lhq.value = req_impl; 6383 6384 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 6385 6386 pthread_mutex_lock(&ctx_impl->mutex); 6387 6388 res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq); 6389 6390 pthread_mutex_unlock(&ctx_impl->mutex); 6391 6392 switch (res) { 6393 6394 case NXT_OK: 6395 req_impl->in_hash = 1; 6396 return NXT_UNIT_OK; 6397 6398 default: 6399 return NXT_UNIT_ERROR; 6400 } 6401 } 6402 6403 6404 static nxt_unit_request_info_t * 6405 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) 6406 { 6407 nxt_int_t res; 6408 nxt_lvlhsh_query_t lhq; 6409 nxt_unit_ctx_impl_t *ctx_impl; 6410 nxt_unit_request_info_impl_t *req_impl; 6411 6412 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); 6413 lhq.key.length = sizeof(stream); 6414 lhq.key.start = (u_char *) &stream; 6415 lhq.proto = &lvlhsh_requests_proto; 6416 lhq.pool = NULL; 6417 6418 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 6419 6420 pthread_mutex_lock(&ctx_impl->mutex); 6421 6422 if (remove) { 6423 res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq); 6424 6425 } else { 6426 res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq); 6427 } 6428 6429 pthread_mutex_unlock(&ctx_impl->mutex); 6430 6431 switch (res) { 6432 6433 case NXT_OK: 6434 req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t, 6435 req); 6436 if (remove) { 6437 req_impl->in_hash = 0; 6438 } 6439 6440 return lhq.value; 6441 6442 default: 6443 return NULL; 6444 } 6445 } 6446 6447 6448 void 6449 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 6450 { 6451 int log_fd, n; 6452 char msg[NXT_MAX_ERROR_STR], *p, *end; 6453 pid_t pid; 6454 va_list ap; 6455 nxt_unit_impl_t *lib; 6456 6457 if (nxt_fast_path(ctx != NULL)) { 6458 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 6459 6460 pid = lib->pid; 6461 log_fd = lib->log_fd; 6462 6463 } else { 6464 pid = getpid(); 6465 log_fd = STDERR_FILENO; 6466 } 6467 6468 p = msg; 6469 end = p + sizeof(msg) - 1; 6470 6471 p = nxt_unit_snprint_prefix(p, end, pid, level); 6472 6473 va_start(ap, fmt); 6474 p += vsnprintf(p, end - p, fmt, ap); 6475 va_end(ap); 6476 6477 if (nxt_slow_path(p > end)) { 6478 memcpy(end - 5, "[...]", 5); 6479 p = end; 6480 } 6481 6482 *p++ = '\n'; 6483 6484 n = write(log_fd, msg, p - msg); 6485 if (nxt_slow_path(n < 0)) { 6486 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 6487 } 6488 } 6489 6490 6491 void 6492 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) 6493 { 6494 int log_fd, n; 6495 char msg[NXT_MAX_ERROR_STR], *p, *end; 6496 pid_t pid; 6497 va_list ap; 6498 nxt_unit_impl_t *lib; 6499 nxt_unit_request_info_impl_t *req_impl; 6500 6501 if (nxt_fast_path(req != NULL)) { 6502 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 6503 6504 pid = lib->pid; 6505 log_fd = lib->log_fd; 6506 6507 } else { 6508 pid = getpid(); 6509 log_fd = STDERR_FILENO; 6510 } 6511 6512 p = msg; 6513 end = p + sizeof(msg) - 1; 6514 6515 p = nxt_unit_snprint_prefix(p, end, pid, level); 6516 6517 if (nxt_fast_path(req != NULL)) { 6518 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 6519 6520 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); 6521 } 6522 6523 va_start(ap, fmt); 6524 p += vsnprintf(p, end - p, fmt, ap); 6525 va_end(ap); 6526 6527 if (nxt_slow_path(p > end)) { 6528 memcpy(end - 5, "[...]", 5); 6529 p = end; 6530 } 6531 6532 *p++ = '\n'; 6533 6534 n = write(log_fd, msg, p - msg); 6535 if (nxt_slow_path(n < 0)) { 6536 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 6537 } 6538 } 6539 6540 6541 static const char * nxt_unit_log_levels[] = { 6542 "alert", 6543 "error", 6544 "warn", 6545 "notice", 6546 "info", 6547 "debug", 6548 }; 6549 6550 6551 static char * 6552 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) 6553 { 6554 struct tm tm; 6555 struct timespec ts; 6556 6557 (void) clock_gettime(CLOCK_REALTIME, &ts); 6558 6559 #if (NXT_HAVE_LOCALTIME_R) 6560 (void) localtime_r(&ts.tv_sec, &tm); 6561 #else 6562 tm = *localtime(&ts.tv_sec); 6563 #endif 6564 6565 #if (NXT_DEBUG) 6566 p += snprintf(p, end - p, 6567 "%4d/%02d/%02d %02d:%02d:%02d.%03d ", 6568 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 6569 tm.tm_hour, tm.tm_min, tm.tm_sec, 6570 (int) ts.tv_nsec / 1000000); 6571 #else 6572 p += snprintf(p, end - p, 6573 "%4d/%02d/%02d %02d:%02d:%02d ", 6574 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 6575 tm.tm_hour, tm.tm_min, tm.tm_sec); 6576 #endif 6577 6578 p += snprintf(p, end - p, 6579 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], 6580 (int) pid, 6581 (uint64_t) (uintptr_t) nxt_thread_get_tid()); 6582 6583 return p; 6584 } 6585 6586 6587 static void * 6588 nxt_unit_lvlhsh_alloc(void *data, size_t size) 6589 { 6590 int err; 6591 void *p; 6592 6593 err = posix_memalign(&p, size, size); 6594 6595 if (nxt_fast_path(err == 0)) { 6596 nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p", 6597 (int) size, (int) size, p); 6598 return p; 6599 } 6600 6601 nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)", 6602 (int) size, (int) size, strerror(err), err); 6603 return NULL; 6604 } 6605 6606 6607 static void 6608 nxt_unit_lvlhsh_free(void *data, void *p) 6609 { 6610 nxt_unit_free(NULL, p); 6611 } 6612 6613 6614 void * 6615 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size) 6616 { 6617 void *p; 6618 6619 p = malloc(size); 6620 6621 if (nxt_fast_path(p != NULL)) { 6622 #if (NXT_DEBUG_ALLOC) 6623 nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p); 6624 #endif 6625 6626 } else { 6627 nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)", 6628 (int) size, strerror(errno), errno); 6629 } 6630 6631 return p; 6632 } 6633 6634 6635 void 6636 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p) 6637 { 6638 #if (NXT_DEBUG_ALLOC) 6639 nxt_unit_debug(ctx, "free(%p)", p); 6640 #endif 6641 6642 free(p); 6643 } 6644 6645 6646 static int 6647 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length) 6648 { 6649 u_char c1, c2; 6650 nxt_int_t n; 6651 const u_char *s1, *s2; 6652 6653 s1 = p1; 6654 s2 = p2; 6655 6656 while (length-- != 0) { 6657 c1 = *s1++; 6658 c2 = *s2++; 6659 6660 c1 = nxt_lowcase(c1); 6661 c2 = nxt_lowcase(c2); 6662 6663 n = c1 - c2; 6664 6665 if (n != 0) { 6666 return n; 6667 } 6668 } 6669 6670 return 0; 6671 } 6672