1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <stdlib.h> 7 8 #include "nxt_main.h" 9 #include "nxt_port_memory_int.h" 10 #include "nxt_port_queue.h" 11 #include "nxt_app_queue.h" 12 13 #include "nxt_unit.h" 14 #include "nxt_unit_request.h" 15 #include "nxt_unit_response.h" 16 #include "nxt_unit_websocket.h" 17 18 #include "nxt_websocket.h" 19 20 #if (NXT_HAVE_MEMFD_CREATE) 21 #include <linux/memfd.h> 22 #endif 23 24 #define NXT_UNIT_MAX_PLAIN_SIZE 1024 25 #define NXT_UNIT_LOCAL_BUF_SIZE \ 26 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) 27 28 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 29 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 30 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 31 typedef struct nxt_unit_process_s nxt_unit_process_t; 32 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 33 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 34 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; 35 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 36 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 37 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 38 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 39 40 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 41 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 42 nxt_unit_ctx_impl_t *ctx_impl, void *data); 43 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx); 44 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx); 45 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); 46 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); 47 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 48 nxt_unit_mmap_buf_t *mmap_buf); 49 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 50 nxt_unit_mmap_buf_t *mmap_buf); 51 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 52 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 53 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, 54 int *log_fd, uint32_t *stream, uint32_t *shm_limit); 55 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, 56 int queue_fd); 57 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, 58 nxt_unit_request_info_t **preq); 59 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 60 nxt_unit_recv_msg_t *recv_msg); 61 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx); 62 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 63 nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq); 64 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, 65 nxt_unit_recv_msg_t *recv_msg); 66 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 67 nxt_unit_port_id_t *port_id); 68 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); 69 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 70 nxt_unit_recv_msg_t *recv_msg); 71 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); 72 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 73 nxt_unit_ctx_t *ctx); 74 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 75 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 76 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 77 nxt_unit_ctx_t *ctx); 78 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 79 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 80 nxt_unit_websocket_frame_impl_t *ws); 81 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 82 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 83 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 84 nxt_unit_mmap_buf_t *mmap_buf, int last); 85 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 86 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); 87 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); 88 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( 89 nxt_unit_ctx_impl_t *ctx_impl); 90 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 91 nxt_unit_read_buf_t *rbuf); 92 static nxt_unit_mmap_buf_t *nxt_unit_request_preread( 93 nxt_unit_request_info_t *req, size_t size); 94 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 95 size_t size); 96 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 97 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n); 98 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 99 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); 100 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 101 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 102 nxt_unit_port_t *port, int n); 103 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); 104 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 105 int fd); 106 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 107 nxt_unit_port_t *port, uint32_t size, 108 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 109 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 110 111 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, 112 nxt_unit_ctx_impl_t *ctx_impl); 113 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 114 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); 115 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); 116 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 117 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, 118 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, 119 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); 120 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 121 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); 122 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); 123 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 124 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 125 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 126 127 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid); 128 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, 129 pid_t pid, int remove); 130 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 131 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); 132 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); 133 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); 134 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); 135 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); 136 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); 137 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); 138 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); 139 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, 140 nxt_unit_port_t *port); 141 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); 142 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); 143 144 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 145 nxt_unit_port_t *port, int queue_fd); 146 147 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); 148 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); 149 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, 150 nxt_unit_port_t *port, void *queue); 151 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, 152 nxt_queue_t *awaiting_req); 153 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, 154 nxt_unit_port_id_t *port_id); 155 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, 156 nxt_unit_port_id_t *port_id); 157 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); 158 static void nxt_unit_remove_process(nxt_unit_impl_t *lib, 159 nxt_unit_process_t *process); 160 static void nxt_unit_quit(nxt_unit_ctx_t *ctx); 161 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); 162 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, 163 nxt_unit_port_t *port, const void *buf, size_t buf_size, 164 const void *oob, size_t oob_size); 165 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 166 const void *buf, size_t buf_size, const void *oob, size_t oob_size); 167 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 168 nxt_unit_read_buf_t *rbuf); 169 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, 170 nxt_unit_read_buf_t *src); 171 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 172 nxt_unit_read_buf_t *rbuf); 173 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 174 nxt_unit_read_buf_t *rbuf); 175 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, 176 nxt_unit_read_buf_t *rbuf); 177 static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, 178 nxt_unit_read_buf_t *rbuf); 179 nxt_inline int nxt_unit_close(int fd); 180 static int nxt_unit_fd_blocking(int fd); 181 182 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 183 nxt_unit_port_t *port); 184 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 185 nxt_unit_port_id_t *port_id, int remove); 186 187 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 188 nxt_unit_request_info_t *req); 189 static nxt_unit_request_info_t *nxt_unit_request_hash_find( 190 nxt_unit_ctx_t *ctx, uint32_t stream, int remove); 191 192 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 193 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size); 194 static void nxt_unit_lvlhsh_free(void *data, void *p); 195 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length); 196 197 198 struct nxt_unit_mmap_buf_s { 199 nxt_unit_buf_t buf; 200 201 nxt_unit_mmap_buf_t *next; 202 nxt_unit_mmap_buf_t **prev; 203 204 nxt_port_mmap_header_t *hdr; 205 nxt_unit_request_info_t *req; 206 nxt_unit_ctx_impl_t *ctx_impl; 207 char *free_ptr; 208 char *plain_ptr; 209 }; 210 211 212 struct nxt_unit_recv_msg_s { 213 uint32_t stream; 214 nxt_pid_t pid; 215 nxt_port_id_t reply_port; 216 217 uint8_t last; /* 1 bit */ 218 uint8_t mmap; /* 1 bit */ 219 220 void *start; 221 uint32_t size; 222 223 int fd[2]; 224 225 nxt_unit_mmap_buf_t *incoming_buf; 226 }; 227 228 229 typedef enum { 230 NXT_UNIT_RS_START = 0, 231 NXT_UNIT_RS_RESPONSE_INIT, 232 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 233 NXT_UNIT_RS_RESPONSE_SENT, 234 NXT_UNIT_RS_RELEASED, 235 } nxt_unit_req_state_t; 236 237 238 struct nxt_unit_request_info_impl_s { 239 nxt_unit_request_info_t req; 240 241 uint32_t stream; 242 243 nxt_unit_mmap_buf_t *outgoing_buf; 244 nxt_unit_mmap_buf_t *incoming_buf; 245 246 nxt_unit_req_state_t state; 247 uint8_t websocket; 248 uint8_t in_hash; 249 250 /* for nxt_unit_ctx_impl_t.free_req or active_req */ 251 nxt_queue_link_t link; 252 /* for nxt_unit_port_impl_t.awaiting_req */ 253 nxt_queue_link_t port_wait_link; 254 255 char extra_data[]; 256 }; 257 258 259 struct nxt_unit_websocket_frame_impl_s { 260 nxt_unit_websocket_frame_t ws; 261 262 nxt_unit_mmap_buf_t *buf; 263 264 nxt_queue_link_t link; 265 266 nxt_unit_ctx_impl_t *ctx_impl; 267 }; 268 269 270 struct nxt_unit_read_buf_s { 271 nxt_queue_link_t link; 272 nxt_unit_ctx_impl_t *ctx_impl; 273 ssize_t size; 274 char buf[16384]; 275 char oob[256]; 276 }; 277 278 279 struct nxt_unit_ctx_impl_s { 280 nxt_unit_ctx_t ctx; 281 282 nxt_atomic_t use_count; 283 nxt_atomic_t wait_items; 284 285 pthread_mutex_t mutex; 286 287 nxt_unit_port_t *read_port; 288 289 nxt_queue_link_t link; 290 291 nxt_unit_mmap_buf_t *free_buf; 292 293 /* of nxt_unit_request_info_impl_t */ 294 nxt_queue_t free_req; 295 296 /* of nxt_unit_websocket_frame_impl_t */ 297 nxt_queue_t free_ws; 298 299 /* of nxt_unit_request_info_impl_t */ 300 nxt_queue_t active_req; 301 302 /* of nxt_unit_request_info_impl_t */ 303 nxt_lvlhsh_t requests; 304 305 /* of nxt_unit_request_info_impl_t */ 306 nxt_queue_t ready_req; 307 308 /* of nxt_unit_read_buf_t */ 309 nxt_queue_t pending_rbuf; 310 311 /* of nxt_unit_read_buf_t */ 312 nxt_queue_t free_rbuf; 313 314 int online; 315 int ready; 316 317 nxt_unit_mmap_buf_t ctx_buf[2]; 318 nxt_unit_read_buf_t ctx_read_buf; 319 320 nxt_unit_request_info_impl_t req; 321 }; 322 323 324 struct nxt_unit_mmap_s { 325 nxt_port_mmap_header_t *hdr; 326 pthread_t src_thread; 327 328 /* of nxt_unit_read_buf_t */ 329 nxt_queue_t awaiting_rbuf; 330 }; 331 332 333 struct nxt_unit_mmaps_s { 334 pthread_mutex_t mutex; 335 uint32_t size; 336 uint32_t cap; 337 nxt_atomic_t allocated_chunks; 338 nxt_unit_mmap_t *elts; 339 }; 340 341 342 struct nxt_unit_impl_s { 343 nxt_unit_t unit; 344 nxt_unit_callbacks_t callbacks; 345 346 nxt_atomic_t use_count; 347 348 uint32_t request_data_size; 349 uint32_t shm_mmap_limit; 350 351 pthread_mutex_t mutex; 352 353 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 354 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 355 356 nxt_unit_port_t *router_port; 357 nxt_unit_port_t *shared_port; 358 359 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 360 361 nxt_unit_mmaps_t incoming; 362 nxt_unit_mmaps_t outgoing; 363 364 pid_t pid; 365 int log_fd; 366 367 nxt_unit_ctx_impl_t main_ctx; 368 }; 369 370 371 struct nxt_unit_port_impl_s { 372 nxt_unit_port_t port; 373 374 nxt_atomic_t use_count; 375 376 /* for nxt_unit_process_t.ports */ 377 nxt_queue_link_t link; 378 nxt_unit_process_t *process; 379 380 /* of nxt_unit_request_info_impl_t */ 381 nxt_queue_t awaiting_req; 382 383 int ready; 384 385 void *queue; 386 387 int from_socket; 388 nxt_unit_read_buf_t *socket_rbuf; 389 }; 390 391 392 struct nxt_unit_process_s { 393 pid_t pid; 394 395 nxt_queue_t ports; /* of nxt_unit_port_impl_t */ 396 397 nxt_unit_impl_t *lib; 398 399 nxt_atomic_t use_count; 400 401 uint32_t next_port_id; 402 }; 403 404 405 /* Explicitly using 32 bit types to avoid possible alignment. */ 406 typedef struct { 407 int32_t pid; 408 uint32_t id; 409 } nxt_unit_port_hash_id_t; 410 411 412 nxt_unit_ctx_t * 413 nxt_unit_init(nxt_unit_init_t *init) 414 { 415 int rc, queue_fd; 416 void *mem; 417 uint32_t ready_stream, shm_limit; 418 nxt_unit_ctx_t *ctx; 419 nxt_unit_impl_t *lib; 420 nxt_unit_port_t ready_port, router_port, read_port; 421 422 lib = nxt_unit_create(init); 423 if (nxt_slow_path(lib == NULL)) { 424 return NULL; 425 } 426 427 queue_fd = -1; 428 mem = MAP_FAILED; 429 430 if (init->ready_port.id.pid != 0 431 && init->ready_stream != 0 432 && init->read_port.id.pid != 0) 433 { 434 ready_port = init->ready_port; 435 ready_stream = init->ready_stream; 436 router_port = init->router_port; 437 read_port = init->read_port; 438 lib->log_fd = init->log_fd; 439 440 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 441 ready_port.id.id); 442 nxt_unit_port_id_init(&router_port.id, router_port.id.pid, 443 router_port.id.id); 444 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 445 read_port.id.id); 446 447 } else { 448 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, 449 &lib->log_fd, &ready_stream, &shm_limit); 450 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 451 goto fail; 452 } 453 454 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) 455 / PORT_MMAP_DATA_SIZE; 456 } 457 458 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { 459 lib->shm_mmap_limit = 1; 460 } 461 462 lib->pid = read_port.id.pid; 463 464 ctx = &lib->main_ctx.ctx; 465 466 rc = nxt_unit_fd_blocking(router_port.out_fd); 467 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 468 goto fail; 469 } 470 471 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); 472 if (nxt_slow_path(lib->router_port == NULL)) { 473 nxt_unit_alert(NULL, "failed to add router_port"); 474 475 goto fail; 476 } 477 478 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); 479 if (nxt_slow_path(queue_fd == -1)) { 480 goto fail; 481 } 482 483 mem = mmap(NULL, sizeof(nxt_port_queue_t), 484 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 485 if (nxt_slow_path(mem == MAP_FAILED)) { 486 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 487 strerror(errno), errno); 488 489 goto fail; 490 } 491 492 nxt_port_queue_init(mem); 493 494 rc = nxt_unit_fd_blocking(read_port.in_fd); 495 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 496 goto fail; 497 } 498 499 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); 500 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { 501 nxt_unit_alert(NULL, "failed to add read_port"); 502 503 goto fail; 504 } 505 506 rc = nxt_unit_fd_blocking(ready_port.out_fd); 507 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 508 goto fail; 509 } 510 511 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); 512 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 513 nxt_unit_alert(NULL, "failed to send READY message"); 514 515 goto fail; 516 } 517 518 nxt_unit_close(ready_port.out_fd); 519 nxt_unit_close(queue_fd); 520 521 return ctx; 522 523 fail: 524 525 if (mem != MAP_FAILED) { 526 munmap(mem, sizeof(nxt_port_queue_t)); 527 } 528 529 if (queue_fd != -1) { 530 nxt_unit_close(queue_fd); 531 } 532 533 nxt_unit_ctx_release(&lib->main_ctx.ctx); 534 535 return NULL; 536 } 537 538 539 static nxt_unit_impl_t * 540 nxt_unit_create(nxt_unit_init_t *init) 541 { 542 int rc; 543 nxt_unit_impl_t *lib; 544 nxt_unit_callbacks_t *cb; 545 546 lib = nxt_unit_malloc(NULL, 547 sizeof(nxt_unit_impl_t) + init->request_data_size); 548 if (nxt_slow_path(lib == NULL)) { 549 nxt_unit_alert(NULL, "failed to allocate unit struct"); 550 551 return NULL; 552 } 553 554 rc = pthread_mutex_init(&lib->mutex, NULL); 555 if (nxt_slow_path(rc != 0)) { 556 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 557 558 goto fail; 559 } 560 561 lib->unit.data = init->data; 562 lib->callbacks = init->callbacks; 563 564 lib->request_data_size = init->request_data_size; 565 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) 566 / PORT_MMAP_DATA_SIZE; 567 568 lib->processes.slot = NULL; 569 lib->ports.slot = NULL; 570 571 lib->log_fd = STDERR_FILENO; 572 573 nxt_queue_init(&lib->contexts); 574 575 lib->use_count = 0; 576 lib->router_port = NULL; 577 lib->shared_port = NULL; 578 579 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 580 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 581 pthread_mutex_destroy(&lib->mutex); 582 goto fail; 583 } 584 585 cb = &lib->callbacks; 586 587 if (cb->request_handler == NULL) { 588 nxt_unit_alert(NULL, "request_handler is NULL"); 589 590 pthread_mutex_destroy(&lib->mutex); 591 goto fail; 592 } 593 594 nxt_unit_mmaps_init(&lib->incoming); 595 nxt_unit_mmaps_init(&lib->outgoing); 596 597 return lib; 598 599 fail: 600 601 nxt_unit_free(NULL, lib); 602 603 return NULL; 604 } 605 606 607 static int 608 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 609 void *data) 610 { 611 int rc; 612 613 ctx_impl->ctx.data = data; 614 ctx_impl->ctx.unit = &lib->unit; 615 616 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 617 if (nxt_slow_path(rc != 0)) { 618 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 619 620 return NXT_UNIT_ERROR; 621 } 622 623 nxt_unit_lib_use(lib); 624 625 pthread_mutex_lock(&lib->mutex); 626 627 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 628 629 pthread_mutex_unlock(&lib->mutex); 630 631 ctx_impl->use_count = 1; 632 ctx_impl->wait_items = 0; 633 ctx_impl->online = 1; 634 ctx_impl->ready = 0; 635 636 nxt_queue_init(&ctx_impl->free_req); 637 nxt_queue_init(&ctx_impl->free_ws); 638 nxt_queue_init(&ctx_impl->active_req); 639 nxt_queue_init(&ctx_impl->ready_req); 640 nxt_queue_init(&ctx_impl->pending_rbuf); 641 nxt_queue_init(&ctx_impl->free_rbuf); 642 643 ctx_impl->free_buf = NULL; 644 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 645 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 646 647 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 648 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); 649 650 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; 651 652 ctx_impl->req.req.ctx = &ctx_impl->ctx; 653 ctx_impl->req.req.unit = &lib->unit; 654 655 ctx_impl->read_port = NULL; 656 ctx_impl->requests.slot = 0; 657 658 return NXT_UNIT_OK; 659 } 660 661 662 nxt_inline void 663 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) 664 { 665 nxt_unit_ctx_impl_t *ctx_impl; 666 667 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 668 669 nxt_atomic_fetch_add(&ctx_impl->use_count, 1); 670 } 671 672 673 nxt_inline void 674 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) 675 { 676 long c; 677 nxt_unit_ctx_impl_t *ctx_impl; 678 679 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 680 681 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); 682 683 if (c == 1) { 684 nxt_unit_ctx_free(ctx_impl); 685 } 686 } 687 688 689 nxt_inline void 690 nxt_unit_lib_use(nxt_unit_impl_t *lib) 691 { 692 nxt_atomic_fetch_add(&lib->use_count, 1); 693 } 694 695 696 nxt_inline void 697 nxt_unit_lib_release(nxt_unit_impl_t *lib) 698 { 699 long c; 700 nxt_unit_process_t *process; 701 702 c = nxt_atomic_fetch_add(&lib->use_count, -1); 703 704 if (c == 1) { 705 for ( ;; ) { 706 pthread_mutex_lock(&lib->mutex); 707 708 process = nxt_unit_process_pop_first(lib); 709 if (process == NULL) { 710 pthread_mutex_unlock(&lib->mutex); 711 712 break; 713 } 714 715 nxt_unit_remove_process(lib, process); 716 } 717 718 pthread_mutex_destroy(&lib->mutex); 719 720 if (nxt_fast_path(lib->router_port != NULL)) { 721 nxt_unit_port_release(lib->router_port); 722 } 723 724 if (nxt_fast_path(lib->shared_port != NULL)) { 725 nxt_unit_port_release(lib->shared_port); 726 } 727 728 nxt_unit_mmaps_destroy(&lib->incoming); 729 nxt_unit_mmaps_destroy(&lib->outgoing); 730 731 nxt_unit_free(NULL, lib); 732 } 733 } 734 735 736 nxt_inline void 737 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 738 nxt_unit_mmap_buf_t *mmap_buf) 739 { 740 mmap_buf->next = *head; 741 742 if (mmap_buf->next != NULL) { 743 mmap_buf->next->prev = &mmap_buf->next; 744 } 745 746 *head = mmap_buf; 747 mmap_buf->prev = head; 748 } 749 750 751 nxt_inline void 752 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 753 nxt_unit_mmap_buf_t *mmap_buf) 754 { 755 while (*prev != NULL) { 756 prev = &(*prev)->next; 757 } 758 759 nxt_unit_mmap_buf_insert(prev, mmap_buf); 760 } 761 762 763 nxt_inline void 764 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) 765 { 766 nxt_unit_mmap_buf_t **prev; 767 768 prev = mmap_buf->prev; 769 770 if (mmap_buf->next != NULL) { 771 mmap_buf->next->prev = prev; 772 } 773 774 if (prev != NULL) { 775 *prev = mmap_buf->next; 776 } 777 } 778 779 780 static int 781 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, 782 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, 783 uint32_t *shm_limit) 784 { 785 int rc; 786 int ready_fd, router_fd, read_in_fd, read_out_fd; 787 char *unit_init, *version_end, *vars; 788 size_t version_length; 789 int64_t ready_pid, router_pid, read_pid; 790 uint32_t ready_stream, router_id, ready_id, read_id; 791 792 unit_init = getenv(NXT_UNIT_INIT_ENV); 793 if (nxt_slow_path(unit_init == NULL)) { 794 nxt_unit_alert(NULL, "%s is not in the current environment", 795 NXT_UNIT_INIT_ENV); 796 797 return NXT_UNIT_ERROR; 798 } 799 800 version_end = strchr(unit_init, ';'); 801 if (nxt_slow_path(version_end == NULL)) { 802 nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"", 803 NXT_UNIT_INIT_ENV, unit_init); 804 805 return NXT_UNIT_ERROR; 806 } 807 808 version_length = version_end - unit_init; 809 810 rc = version_length != nxt_length(NXT_VERSION) 811 || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION)); 812 813 if (nxt_slow_path(rc != 0)) { 814 nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version " 815 "%.*s, while the app was compiled with libunit %s", 816 (int) version_length, unit_init, NXT_VERSION); 817 818 return NXT_UNIT_ERROR; 819 } 820 821 vars = version_end + 1; 822 823 rc = sscanf(vars, 824 "%"PRIu32";" 825 "%"PRId64",%"PRIu32",%d;" 826 "%"PRId64",%"PRIu32",%d;" 827 "%"PRId64",%"PRIu32",%d,%d;" 828 "%d,%"PRIu32, 829 &ready_stream, 830 &ready_pid, &ready_id, &ready_fd, 831 &router_pid, &router_id, &router_fd, 832 &read_pid, &read_id, &read_in_fd, &read_out_fd, 833 log_fd, shm_limit); 834 835 if (nxt_slow_path(rc == EOF)) { 836 nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env", 837 vars, strerror(errno), errno, NXT_UNIT_INIT_ENV); 838 839 return NXT_UNIT_ERROR; 840 } 841 842 if (nxt_slow_path(rc != 13)) { 843 nxt_unit_alert(NULL, "invalid number of variables in %s env: " 844 "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars); 845 846 return NXT_UNIT_ERROR; 847 } 848 849 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 850 851 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 852 853 ready_port->in_fd = -1; 854 ready_port->out_fd = ready_fd; 855 ready_port->data = NULL; 856 857 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); 858 859 router_port->in_fd = -1; 860 router_port->out_fd = router_fd; 861 router_port->data = NULL; 862 863 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 864 865 read_port->in_fd = read_in_fd; 866 read_port->out_fd = read_out_fd; 867 read_port->data = NULL; 868 869 *stream = ready_stream; 870 871 return NXT_UNIT_OK; 872 } 873 874 875 static int 876 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) 877 { 878 ssize_t res; 879 nxt_port_msg_t msg; 880 nxt_unit_impl_t *lib; 881 882 union { 883 struct cmsghdr cm; 884 char space[CMSG_SPACE(sizeof(int))]; 885 } cmsg; 886 887 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 888 889 msg.stream = stream; 890 msg.pid = lib->pid; 891 msg.reply_port = 0; 892 msg.type = _NXT_PORT_MSG_PROCESS_READY; 893 msg.last = 1; 894 msg.mmap = 0; 895 msg.nf = 0; 896 msg.mf = 0; 897 msg.tracking = 0; 898 899 memset(&cmsg, 0, sizeof(cmsg)); 900 901 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 902 cmsg.cm.cmsg_level = SOL_SOCKET; 903 cmsg.cm.cmsg_type = SCM_RIGHTS; 904 905 /* 906 * memcpy() is used instead of simple 907 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 908 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 909 * dereferencing type-punned pointer will break strict-aliasing rules 910 * 911 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 912 * in the same simple assignment as in the code above. 913 */ 914 memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); 915 916 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), 917 &cmsg, sizeof(cmsg)); 918 if (res != sizeof(msg)) { 919 return NXT_UNIT_ERROR; 920 } 921 922 return NXT_UNIT_OK; 923 } 924 925 926 static int 927 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, 928 nxt_unit_request_info_t **preq) 929 { 930 int rc; 931 pid_t pid; 932 struct cmsghdr *cm; 933 nxt_port_msg_t *port_msg; 934 nxt_unit_impl_t *lib; 935 nxt_unit_recv_msg_t recv_msg; 936 937 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 938 939 recv_msg.fd[0] = -1; 940 recv_msg.fd[1] = -1; 941 port_msg = (nxt_port_msg_t *) rbuf->buf; 942 cm = (struct cmsghdr *) rbuf->oob; 943 944 if (cm->cmsg_level == SOL_SOCKET 945 && cm->cmsg_type == SCM_RIGHTS) 946 { 947 if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { 948 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 949 } 950 951 if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { 952 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); 953 } 954 } 955 956 recv_msg.incoming_buf = NULL; 957 958 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 959 if (nxt_slow_path(rbuf->size == 0)) { 960 nxt_unit_debug(ctx, "read port closed"); 961 962 nxt_unit_quit(ctx); 963 rc = NXT_UNIT_OK; 964 goto done; 965 } 966 967 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); 968 969 rc = NXT_UNIT_ERROR; 970 goto done; 971 } 972 973 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", 974 port_msg->stream, (int) port_msg->type, 975 recv_msg.fd[0], recv_msg.fd[1]); 976 977 recv_msg.stream = port_msg->stream; 978 recv_msg.pid = port_msg->pid; 979 recv_msg.reply_port = port_msg->reply_port; 980 recv_msg.last = port_msg->last; 981 recv_msg.mmap = port_msg->mmap; 982 983 recv_msg.start = port_msg + 1; 984 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); 985 986 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 987 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)", 988 port_msg->stream, (int) port_msg->type); 989 rc = NXT_UNIT_ERROR; 990 goto done; 991 } 992 993 /* Fragmentation is unsupported. */ 994 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 995 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)", 996 port_msg->stream, (int) port_msg->type); 997 rc = NXT_UNIT_ERROR; 998 goto done; 999 } 1000 1001 if (port_msg->mmap) { 1002 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); 1003 1004 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1005 if (rc == NXT_UNIT_AGAIN) { 1006 recv_msg.fd[0] = -1; 1007 recv_msg.fd[1] = -1; 1008 } 1009 1010 goto done; 1011 } 1012 } 1013 1014 switch (port_msg->type) { 1015 1016 case _NXT_PORT_MSG_RPC_READY: 1017 rc = NXT_UNIT_OK; 1018 break; 1019 1020 case _NXT_PORT_MSG_QUIT: 1021 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 1022 1023 nxt_unit_quit(ctx); 1024 rc = NXT_UNIT_OK; 1025 break; 1026 1027 case _NXT_PORT_MSG_NEW_PORT: 1028 rc = nxt_unit_process_new_port(ctx, &recv_msg); 1029 break; 1030 1031 case _NXT_PORT_MSG_PORT_ACK: 1032 rc = nxt_unit_ctx_ready(ctx); 1033 break; 1034 1035 case _NXT_PORT_MSG_CHANGE_FILE: 1036 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 1037 port_msg->stream, recv_msg.fd[0]); 1038 1039 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { 1040 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", 1041 port_msg->stream, recv_msg.fd[0], lib->log_fd, 1042 strerror(errno), errno); 1043 1044 rc = NXT_UNIT_ERROR; 1045 goto done; 1046 } 1047 1048 rc = NXT_UNIT_OK; 1049 break; 1050 1051 case _NXT_PORT_MSG_MMAP: 1052 if (nxt_slow_path(recv_msg.fd[0] < 0)) { 1053 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 1054 port_msg->stream, recv_msg.fd[0]); 1055 1056 rc = NXT_UNIT_ERROR; 1057 goto done; 1058 } 1059 1060 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); 1061 break; 1062 1063 case _NXT_PORT_MSG_REQ_HEADERS: 1064 rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq); 1065 break; 1066 1067 case _NXT_PORT_MSG_REQ_BODY: 1068 rc = nxt_unit_process_req_body(ctx, &recv_msg); 1069 break; 1070 1071 case _NXT_PORT_MSG_WEBSOCKET: 1072 rc = nxt_unit_process_websocket(ctx, &recv_msg); 1073 break; 1074 1075 case _NXT_PORT_MSG_REMOVE_PID: 1076 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 1077 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " 1078 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 1079 (int) sizeof(pid)); 1080 1081 rc = NXT_UNIT_ERROR; 1082 goto done; 1083 } 1084 1085 memcpy(&pid, recv_msg.start, sizeof(pid)); 1086 1087 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 1088 port_msg->stream, (int) pid); 1089 1090 nxt_unit_remove_pid(lib, pid); 1091 1092 rc = NXT_UNIT_OK; 1093 break; 1094 1095 case _NXT_PORT_MSG_SHM_ACK: 1096 rc = nxt_unit_process_shm_ack(ctx); 1097 break; 1098 1099 default: 1100 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d", 1101 port_msg->stream, (int) port_msg->type); 1102 1103 rc = NXT_UNIT_ERROR; 1104 goto done; 1105 } 1106 1107 done: 1108 1109 if (recv_msg.fd[0] != -1) { 1110 nxt_unit_close(recv_msg.fd[0]); 1111 } 1112 1113 if (recv_msg.fd[1] != -1) { 1114 nxt_unit_close(recv_msg.fd[1]); 1115 } 1116 1117 while (recv_msg.incoming_buf != NULL) { 1118 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 1119 } 1120 1121 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { 1122 #if (NXT_DEBUG) 1123 memset(rbuf->buf, 0xAC, rbuf->size); 1124 #endif 1125 nxt_unit_read_buf_release(ctx, rbuf); 1126 } 1127 1128 return rc; 1129 } 1130 1131 1132 static int 1133 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1134 { 1135 void *mem; 1136 nxt_unit_impl_t *lib; 1137 nxt_unit_port_t new_port, *port; 1138 nxt_port_msg_new_port_t *new_port_msg; 1139 1140 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 1141 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 1142 "invalid message size (%d)", 1143 recv_msg->stream, (int) recv_msg->size); 1144 1145 return NXT_UNIT_ERROR; 1146 } 1147 1148 if (nxt_slow_path(recv_msg->fd[0] < 0)) { 1149 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 1150 recv_msg->stream, recv_msg->fd[0]); 1151 1152 return NXT_UNIT_ERROR; 1153 } 1154 1155 new_port_msg = recv_msg->start; 1156 1157 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d", 1158 recv_msg->stream, (int) new_port_msg->pid, 1159 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); 1160 1161 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1162 1163 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { 1164 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); 1165 1166 new_port.in_fd = recv_msg->fd[0]; 1167 new_port.out_fd = -1; 1168 1169 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, 1170 MAP_SHARED, recv_msg->fd[1], 0); 1171 1172 } else { 1173 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) 1174 != NXT_UNIT_OK)) 1175 { 1176 return NXT_UNIT_ERROR; 1177 } 1178 1179 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 1180 new_port_msg->id); 1181 1182 new_port.in_fd = -1; 1183 new_port.out_fd = recv_msg->fd[0]; 1184 1185 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, 1186 MAP_SHARED, recv_msg->fd[1], 0); 1187 } 1188 1189 if (nxt_slow_path(mem == MAP_FAILED)) { 1190 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], 1191 strerror(errno), errno); 1192 1193 return NXT_UNIT_ERROR; 1194 } 1195 1196 new_port.data = NULL; 1197 1198 recv_msg->fd[0] = -1; 1199 1200 port = nxt_unit_add_port(ctx, &new_port, mem); 1201 if (nxt_slow_path(port == NULL)) { 1202 return NXT_UNIT_ERROR; 1203 } 1204 1205 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { 1206 lib->shared_port = port; 1207 1208 return nxt_unit_ctx_ready(ctx); 1209 } 1210 1211 nxt_unit_port_release(port); 1212 1213 return NXT_UNIT_OK; 1214 } 1215 1216 1217 static int 1218 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) 1219 { 1220 nxt_unit_impl_t *lib; 1221 nxt_unit_ctx_impl_t *ctx_impl; 1222 1223 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1224 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1225 1226 ctx_impl->ready = 1; 1227 1228 if (lib->callbacks.ready_handler) { 1229 return lib->callbacks.ready_handler(ctx); 1230 } 1231 1232 return NXT_UNIT_OK; 1233 } 1234 1235 1236 static int 1237 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 1238 nxt_unit_request_info_t **preq) 1239 { 1240 int res; 1241 nxt_unit_impl_t *lib; 1242 nxt_unit_port_id_t port_id; 1243 nxt_unit_request_t *r; 1244 nxt_unit_mmap_buf_t *b; 1245 nxt_unit_request_info_t *req; 1246 nxt_unit_request_info_impl_t *req_impl; 1247 1248 if (nxt_slow_path(recv_msg->mmap == 0)) { 1249 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 1250 recv_msg->stream); 1251 1252 return NXT_UNIT_ERROR; 1253 } 1254 1255 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 1256 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 1257 "%d expected", recv_msg->stream, (int) recv_msg->size, 1258 (int) sizeof(nxt_unit_request_t)); 1259 1260 return NXT_UNIT_ERROR; 1261 } 1262 1263 req_impl = nxt_unit_request_info_get(ctx); 1264 if (nxt_slow_path(req_impl == NULL)) { 1265 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 1266 recv_msg->stream); 1267 1268 return NXT_UNIT_ERROR; 1269 } 1270 1271 req = &req_impl->req; 1272 1273 req->request = recv_msg->start; 1274 1275 b = recv_msg->incoming_buf; 1276 1277 req->request_buf = &b->buf; 1278 req->response = NULL; 1279 req->response_buf = NULL; 1280 1281 r = req->request; 1282 1283 req->content_length = r->content_length; 1284 1285 req->content_buf = req->request_buf; 1286 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 1287 1288 req_impl->stream = recv_msg->stream; 1289 1290 req_impl->outgoing_buf = NULL; 1291 1292 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1293 b->req = req; 1294 } 1295 1296 /* "Move" incoming buffer list to req_impl. */ 1297 req_impl->incoming_buf = recv_msg->incoming_buf; 1298 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 1299 recv_msg->incoming_buf = NULL; 1300 1301 req->content_fd = recv_msg->fd[0]; 1302 recv_msg->fd[0] = -1; 1303 1304 req->response_max_fields = 0; 1305 req_impl->state = NXT_UNIT_RS_START; 1306 req_impl->websocket = 0; 1307 req_impl->in_hash = 0; 1308 1309 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 1310 (int) r->method_length, 1311 (char *) nxt_unit_sptr_get(&r->method), 1312 (int) r->target_length, 1313 (char *) nxt_unit_sptr_get(&r->target), 1314 (int) r->content_length); 1315 1316 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); 1317 1318 res = nxt_unit_request_check_response_port(req, &port_id); 1319 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1320 return NXT_UNIT_ERROR; 1321 } 1322 1323 if (nxt_fast_path(res == NXT_UNIT_OK)) { 1324 res = nxt_unit_send_req_headers_ack(req); 1325 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1326 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1327 1328 return NXT_UNIT_ERROR; 1329 } 1330 1331 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1332 1333 if (req->content_length 1334 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 1335 { 1336 res = nxt_unit_request_hash_add(ctx, req); 1337 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1338 nxt_unit_req_warn(req, "failed to add request to hash"); 1339 1340 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1341 1342 return NXT_UNIT_ERROR; 1343 } 1344 1345 /* 1346 * If application have separate data handler, we may start 1347 * request processing and process data when it is arrived. 1348 */ 1349 if (lib->callbacks.data_handler == NULL) { 1350 return NXT_UNIT_OK; 1351 } 1352 } 1353 1354 if (preq == NULL) { 1355 lib->callbacks.request_handler(req); 1356 1357 } else { 1358 *preq = req; 1359 } 1360 } 1361 1362 return NXT_UNIT_OK; 1363 } 1364 1365 1366 static int 1367 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1368 { 1369 uint64_t l; 1370 nxt_unit_impl_t *lib; 1371 nxt_unit_mmap_buf_t *b; 1372 nxt_unit_request_info_t *req; 1373 1374 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1375 if (req == NULL) { 1376 return NXT_UNIT_OK; 1377 } 1378 1379 l = req->content_buf->end - req->content_buf->free; 1380 1381 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1382 b->req = req; 1383 l += b->buf.end - b->buf.free; 1384 } 1385 1386 if (recv_msg->incoming_buf != NULL) { 1387 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); 1388 1389 while (b->next != NULL) { 1390 b = b->next; 1391 } 1392 1393 /* "Move" incoming buffer list to req_impl. */ 1394 b->next = recv_msg->incoming_buf; 1395 b->next->prev = &b->next; 1396 1397 recv_msg->incoming_buf = NULL; 1398 } 1399 1400 req->content_fd = recv_msg->fd[0]; 1401 recv_msg->fd[0] = -1; 1402 1403 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1404 1405 if (lib->callbacks.data_handler != NULL) { 1406 lib->callbacks.data_handler(req); 1407 1408 return NXT_UNIT_OK; 1409 } 1410 1411 if (req->content_fd != -1 || l == req->content_length) { 1412 lib->callbacks.request_handler(req); 1413 } 1414 1415 return NXT_UNIT_OK; 1416 } 1417 1418 1419 static int 1420 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 1421 nxt_unit_port_id_t *port_id) 1422 { 1423 int res; 1424 nxt_unit_ctx_t *ctx; 1425 nxt_unit_impl_t *lib; 1426 nxt_unit_port_t *port; 1427 nxt_unit_process_t *process; 1428 nxt_unit_ctx_impl_t *ctx_impl; 1429 nxt_unit_port_impl_t *port_impl; 1430 nxt_unit_request_info_impl_t *req_impl; 1431 1432 ctx = req->ctx; 1433 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1434 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1435 1436 pthread_mutex_lock(&lib->mutex); 1437 1438 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 1439 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 1440 1441 if (nxt_fast_path(port != NULL)) { 1442 req->response_port = port; 1443 1444 if (nxt_fast_path(port_impl->ready)) { 1445 pthread_mutex_unlock(&lib->mutex); 1446 1447 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", 1448 (int) port->id.pid, (int) port->id.id); 1449 1450 return NXT_UNIT_OK; 1451 } 1452 1453 nxt_unit_debug(ctx, "check_response_port: " 1454 "port{%d,%d} already requested", 1455 (int) port->id.pid, (int) port->id.id); 1456 1457 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1458 1459 nxt_queue_insert_tail(&port_impl->awaiting_req, 1460 &req_impl->port_wait_link); 1461 1462 pthread_mutex_unlock(&lib->mutex); 1463 1464 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1465 1466 return NXT_UNIT_AGAIN; 1467 } 1468 1469 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 1470 if (nxt_slow_path(port_impl == NULL)) { 1471 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", 1472 (int) sizeof(nxt_unit_port_impl_t)); 1473 1474 pthread_mutex_unlock(&lib->mutex); 1475 1476 return NXT_UNIT_ERROR; 1477 } 1478 1479 port = &port_impl->port; 1480 1481 port->id = *port_id; 1482 port->in_fd = -1; 1483 port->out_fd = -1; 1484 port->data = NULL; 1485 1486 res = nxt_unit_port_hash_add(&lib->ports, port); 1487 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1488 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", 1489 port->id.pid, port->id.id); 1490 1491 pthread_mutex_unlock(&lib->mutex); 1492 1493 nxt_unit_free(ctx, port); 1494 1495 return NXT_UNIT_ERROR; 1496 } 1497 1498 process = nxt_unit_process_find(lib, port_id->pid, 0); 1499 if (nxt_slow_path(process == NULL)) { 1500 nxt_unit_alert(ctx, "check_response_port: process %d not found", 1501 port->id.pid); 1502 1503 nxt_unit_port_hash_find(&lib->ports, port_id, 1); 1504 1505 pthread_mutex_unlock(&lib->mutex); 1506 1507 nxt_unit_free(ctx, port); 1508 1509 return NXT_UNIT_ERROR; 1510 } 1511 1512 nxt_queue_insert_tail(&process->ports, &port_impl->link); 1513 1514 port_impl->process = process; 1515 port_impl->queue = NULL; 1516 port_impl->from_socket = 0; 1517 port_impl->socket_rbuf = NULL; 1518 1519 nxt_queue_init(&port_impl->awaiting_req); 1520 1521 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1522 1523 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); 1524 1525 port_impl->use_count = 2; 1526 port_impl->ready = 0; 1527 1528 req->response_port = port; 1529 1530 pthread_mutex_unlock(&lib->mutex); 1531 1532 res = nxt_unit_get_port(ctx, port_id); 1533 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1534 return NXT_UNIT_ERROR; 1535 } 1536 1537 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1538 1539 return NXT_UNIT_AGAIN; 1540 } 1541 1542 1543 static int 1544 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) 1545 { 1546 ssize_t res; 1547 nxt_port_msg_t msg; 1548 nxt_unit_impl_t *lib; 1549 nxt_unit_ctx_impl_t *ctx_impl; 1550 nxt_unit_request_info_impl_t *req_impl; 1551 1552 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 1553 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1554 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1555 1556 memset(&msg, 0, sizeof(nxt_port_msg_t)); 1557 1558 msg.stream = req_impl->stream; 1559 msg.pid = lib->pid; 1560 msg.reply_port = ctx_impl->read_port->id.id; 1561 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; 1562 1563 res = nxt_unit_port_send(req->ctx, req->response_port, 1564 &msg, sizeof(msg), NULL, 0); 1565 if (nxt_slow_path(res != sizeof(msg))) { 1566 return NXT_UNIT_ERROR; 1567 } 1568 1569 return NXT_UNIT_OK; 1570 } 1571 1572 1573 static int 1574 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1575 { 1576 size_t hsize; 1577 nxt_unit_impl_t *lib; 1578 nxt_unit_mmap_buf_t *b; 1579 nxt_unit_callbacks_t *cb; 1580 nxt_unit_request_info_t *req; 1581 nxt_unit_request_info_impl_t *req_impl; 1582 nxt_unit_websocket_frame_impl_t *ws_impl; 1583 1584 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1585 if (nxt_slow_path(req == NULL)) { 1586 return NXT_UNIT_OK; 1587 } 1588 1589 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1590 1591 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1592 cb = &lib->callbacks; 1593 1594 if (cb->websocket_handler && recv_msg->size >= 2) { 1595 ws_impl = nxt_unit_websocket_frame_get(ctx); 1596 if (nxt_slow_path(ws_impl == NULL)) { 1597 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 1598 req_impl->stream); 1599 1600 return NXT_UNIT_ERROR; 1601 } 1602 1603 ws_impl->ws.req = req; 1604 1605 ws_impl->buf = NULL; 1606 1607 if (recv_msg->mmap) { 1608 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1609 b->req = req; 1610 } 1611 1612 /* "Move" incoming buffer list to ws_impl. */ 1613 ws_impl->buf = recv_msg->incoming_buf; 1614 ws_impl->buf->prev = &ws_impl->buf; 1615 recv_msg->incoming_buf = NULL; 1616 1617 b = ws_impl->buf; 1618 1619 } else { 1620 b = nxt_unit_mmap_buf_get(ctx); 1621 if (nxt_slow_path(b == NULL)) { 1622 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 1623 req_impl->stream); 1624 1625 nxt_unit_websocket_frame_release(&ws_impl->ws); 1626 1627 return NXT_UNIT_ERROR; 1628 } 1629 1630 b->req = req; 1631 b->buf.start = recv_msg->start; 1632 b->buf.free = b->buf.start; 1633 b->buf.end = b->buf.start + recv_msg->size; 1634 1635 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 1636 } 1637 1638 ws_impl->ws.header = (void *) b->buf.start; 1639 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1640 ws_impl->ws.header); 1641 1642 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1643 1644 if (ws_impl->ws.header->mask) { 1645 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1646 1647 } else { 1648 ws_impl->ws.mask = NULL; 1649 } 1650 1651 b->buf.free += hsize; 1652 1653 ws_impl->ws.content_buf = &b->buf; 1654 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1655 1656 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1657 "payload_len=%"PRIu64, 1658 ws_impl->ws.header->opcode, 1659 ws_impl->ws.payload_len); 1660 1661 cb->websocket_handler(&ws_impl->ws); 1662 } 1663 1664 if (recv_msg->last) { 1665 if (cb->close_handler) { 1666 nxt_unit_req_debug(req, "close_handler"); 1667 1668 cb->close_handler(req); 1669 1670 } else { 1671 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1672 } 1673 } 1674 1675 return NXT_UNIT_OK; 1676 } 1677 1678 1679 static int 1680 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) 1681 { 1682 nxt_unit_impl_t *lib; 1683 nxt_unit_callbacks_t *cb; 1684 1685 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1686 cb = &lib->callbacks; 1687 1688 if (cb->shm_ack_handler != NULL) { 1689 cb->shm_ack_handler(ctx); 1690 } 1691 1692 return NXT_UNIT_OK; 1693 } 1694 1695 1696 static nxt_unit_request_info_impl_t * 1697 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1698 { 1699 nxt_unit_impl_t *lib; 1700 nxt_queue_link_t *lnk; 1701 nxt_unit_ctx_impl_t *ctx_impl; 1702 nxt_unit_request_info_impl_t *req_impl; 1703 1704 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1705 1706 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1707 1708 pthread_mutex_lock(&ctx_impl->mutex); 1709 1710 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1711 pthread_mutex_unlock(&ctx_impl->mutex); 1712 1713 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t) 1714 + lib->request_data_size); 1715 if (nxt_slow_path(req_impl == NULL)) { 1716 return NULL; 1717 } 1718 1719 req_impl->req.unit = ctx->unit; 1720 req_impl->req.ctx = ctx; 1721 1722 pthread_mutex_lock(&ctx_impl->mutex); 1723 1724 } else { 1725 lnk = nxt_queue_first(&ctx_impl->free_req); 1726 nxt_queue_remove(lnk); 1727 1728 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1729 } 1730 1731 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1732 1733 pthread_mutex_unlock(&ctx_impl->mutex); 1734 1735 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1736 1737 return req_impl; 1738 } 1739 1740 1741 static void 1742 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1743 { 1744 nxt_unit_ctx_impl_t *ctx_impl; 1745 nxt_unit_request_info_impl_t *req_impl; 1746 1747 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1748 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1749 1750 req->response = NULL; 1751 req->response_buf = NULL; 1752 1753 if (req_impl->in_hash) { 1754 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); 1755 } 1756 1757 while (req_impl->outgoing_buf != NULL) { 1758 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1759 } 1760 1761 while (req_impl->incoming_buf != NULL) { 1762 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1763 } 1764 1765 if (req->content_fd != -1) { 1766 nxt_unit_close(req->content_fd); 1767 1768 req->content_fd = -1; 1769 } 1770 1771 if (req->response_port != NULL) { 1772 nxt_unit_port_release(req->response_port); 1773 1774 req->response_port = NULL; 1775 } 1776 1777 req_impl->state = NXT_UNIT_RS_RELEASED; 1778 1779 pthread_mutex_lock(&ctx_impl->mutex); 1780 1781 nxt_queue_remove(&req_impl->link); 1782 1783 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1784 1785 pthread_mutex_unlock(&ctx_impl->mutex); 1786 } 1787 1788 1789 static void 1790 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1791 { 1792 nxt_unit_ctx_impl_t *ctx_impl; 1793 1794 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1795 1796 nxt_queue_remove(&req_impl->link); 1797 1798 if (req_impl != &ctx_impl->req) { 1799 nxt_unit_free(&ctx_impl->ctx, req_impl); 1800 } 1801 } 1802 1803 1804 static nxt_unit_websocket_frame_impl_t * 1805 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1806 { 1807 nxt_queue_link_t *lnk; 1808 nxt_unit_ctx_impl_t *ctx_impl; 1809 nxt_unit_websocket_frame_impl_t *ws_impl; 1810 1811 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1812 1813 pthread_mutex_lock(&ctx_impl->mutex); 1814 1815 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1816 pthread_mutex_unlock(&ctx_impl->mutex); 1817 1818 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t)); 1819 if (nxt_slow_path(ws_impl == NULL)) { 1820 return NULL; 1821 } 1822 1823 } else { 1824 lnk = nxt_queue_first(&ctx_impl->free_ws); 1825 nxt_queue_remove(lnk); 1826 1827 pthread_mutex_unlock(&ctx_impl->mutex); 1828 1829 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1830 } 1831 1832 ws_impl->ctx_impl = ctx_impl; 1833 1834 return ws_impl; 1835 } 1836 1837 1838 static void 1839 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1840 { 1841 nxt_unit_websocket_frame_impl_t *ws_impl; 1842 1843 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1844 1845 while (ws_impl->buf != NULL) { 1846 nxt_unit_mmap_buf_free(ws_impl->buf); 1847 } 1848 1849 ws->req = NULL; 1850 1851 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1852 1853 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1854 1855 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1856 } 1857 1858 1859 static void 1860 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 1861 nxt_unit_websocket_frame_impl_t *ws_impl) 1862 { 1863 nxt_queue_remove(&ws_impl->link); 1864 1865 nxt_unit_free(ctx, ws_impl); 1866 } 1867 1868 1869 uint16_t 1870 nxt_unit_field_hash(const char *name, size_t name_length) 1871 { 1872 u_char ch; 1873 uint32_t hash; 1874 const char *p, *end; 1875 1876 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1877 end = name + name_length; 1878 1879 for (p = name; p < end; p++) { 1880 ch = *p; 1881 hash = (hash << 4) + hash + nxt_lowcase(ch); 1882 } 1883 1884 hash = (hash >> 16) ^ hash; 1885 1886 return hash; 1887 } 1888 1889 1890 void 1891 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1892 { 1893 char *name; 1894 uint32_t i, j; 1895 nxt_unit_field_t *fields, f; 1896 nxt_unit_request_t *r; 1897 1898 static nxt_str_t content_length = nxt_string("content-length"); 1899 static nxt_str_t content_type = nxt_string("content-type"); 1900 static nxt_str_t cookie = nxt_string("cookie"); 1901 1902 nxt_unit_req_debug(req, "group_dup_fields"); 1903 1904 r = req->request; 1905 fields = r->fields; 1906 1907 for (i = 0; i < r->fields_count; i++) { 1908 name = nxt_unit_sptr_get(&fields[i].name); 1909 1910 switch (fields[i].hash) { 1911 case NXT_UNIT_HASH_CONTENT_LENGTH: 1912 if (fields[i].name_length == content_length.length 1913 && nxt_unit_memcasecmp(name, content_length.start, 1914 content_length.length) == 0) 1915 { 1916 r->content_length_field = i; 1917 } 1918 1919 break; 1920 1921 case NXT_UNIT_HASH_CONTENT_TYPE: 1922 if (fields[i].name_length == content_type.length 1923 && nxt_unit_memcasecmp(name, content_type.start, 1924 content_type.length) == 0) 1925 { 1926 r->content_type_field = i; 1927 } 1928 1929 break; 1930 1931 case NXT_UNIT_HASH_COOKIE: 1932 if (fields[i].name_length == cookie.length 1933 && nxt_unit_memcasecmp(name, cookie.start, 1934 cookie.length) == 0) 1935 { 1936 r->cookie_field = i; 1937 } 1938 1939 break; 1940 } 1941 1942 for (j = i + 1; j < r->fields_count; j++) { 1943 if (fields[i].hash != fields[j].hash 1944 || fields[i].name_length != fields[j].name_length 1945 || nxt_unit_memcasecmp(name, 1946 nxt_unit_sptr_get(&fields[j].name), 1947 fields[j].name_length) != 0) 1948 { 1949 continue; 1950 } 1951 1952 f = fields[j]; 1953 f.value.offset += (j - (i + 1)) * sizeof(f); 1954 1955 while (j > i + 1) { 1956 fields[j] = fields[j - 1]; 1957 fields[j].name.offset -= sizeof(f); 1958 fields[j].value.offset -= sizeof(f); 1959 j--; 1960 } 1961 1962 fields[j] = f; 1963 1964 /* Assign the same name pointer for further grouping simplicity. */ 1965 nxt_unit_sptr_set(&fields[j].name, name); 1966 1967 i++; 1968 } 1969 } 1970 } 1971 1972 1973 int 1974 nxt_unit_response_init(nxt_unit_request_info_t *req, 1975 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 1976 { 1977 uint32_t buf_size; 1978 nxt_unit_buf_t *buf; 1979 nxt_unit_request_info_impl_t *req_impl; 1980 1981 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1982 1983 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1984 nxt_unit_req_warn(req, "init: response already sent"); 1985 1986 return NXT_UNIT_ERROR; 1987 } 1988 1989 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 1990 (int) max_fields_count, (int) max_fields_size); 1991 1992 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 1993 nxt_unit_req_debug(req, "duplicate response init"); 1994 } 1995 1996 /* 1997 * Each field name and value 0-terminated by libunit, 1998 * this is the reason of '+ 2' below. 1999 */ 2000 buf_size = sizeof(nxt_unit_response_t) 2001 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 2002 + max_fields_size; 2003 2004 if (nxt_slow_path(req->response_buf != NULL)) { 2005 buf = req->response_buf; 2006 2007 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 2008 goto init_response; 2009 } 2010 2011 nxt_unit_buf_free(buf); 2012 2013 req->response_buf = NULL; 2014 req->response = NULL; 2015 req->response_max_fields = 0; 2016 2017 req_impl->state = NXT_UNIT_RS_START; 2018 } 2019 2020 buf = nxt_unit_response_buf_alloc(req, buf_size); 2021 if (nxt_slow_path(buf == NULL)) { 2022 return NXT_UNIT_ERROR; 2023 } 2024 2025 init_response: 2026 2027 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 2028 2029 req->response_buf = buf; 2030 2031 req->response = (nxt_unit_response_t *) buf->start; 2032 req->response->status = status; 2033 2034 buf->free = buf->start + sizeof(nxt_unit_response_t) 2035 + max_fields_count * sizeof(nxt_unit_field_t); 2036 2037 req->response_max_fields = max_fields_count; 2038 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 2039 2040 return NXT_UNIT_OK; 2041 } 2042 2043 2044 int 2045 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 2046 uint32_t max_fields_count, uint32_t max_fields_size) 2047 { 2048 char *p; 2049 uint32_t i, buf_size; 2050 nxt_unit_buf_t *buf; 2051 nxt_unit_field_t *f, *src; 2052 nxt_unit_response_t *resp; 2053 nxt_unit_request_info_impl_t *req_impl; 2054 2055 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2056 2057 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2058 nxt_unit_req_warn(req, "realloc: response not init"); 2059 2060 return NXT_UNIT_ERROR; 2061 } 2062 2063 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2064 nxt_unit_req_warn(req, "realloc: response already sent"); 2065 2066 return NXT_UNIT_ERROR; 2067 } 2068 2069 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 2070 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 2071 2072 return NXT_UNIT_ERROR; 2073 } 2074 2075 /* 2076 * Each field name and value 0-terminated by libunit, 2077 * this is the reason of '+ 2' below. 2078 */ 2079 buf_size = sizeof(nxt_unit_response_t) 2080 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 2081 + max_fields_size; 2082 2083 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 2084 2085 buf = nxt_unit_response_buf_alloc(req, buf_size); 2086 if (nxt_slow_path(buf == NULL)) { 2087 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 2088 return NXT_UNIT_ERROR; 2089 } 2090 2091 resp = (nxt_unit_response_t *) buf->start; 2092 2093 memset(resp, 0, sizeof(nxt_unit_response_t)); 2094 2095 resp->status = req->response->status; 2096 resp->content_length = req->response->content_length; 2097 2098 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 2099 f = resp->fields; 2100 2101 for (i = 0; i < req->response->fields_count; i++) { 2102 src = req->response->fields + i; 2103 2104 if (nxt_slow_path(src->skip != 0)) { 2105 continue; 2106 } 2107 2108 if (nxt_slow_path(src->name_length + src->value_length + 2 2109 > (uint32_t) (buf->end - p))) 2110 { 2111 nxt_unit_req_warn(req, "realloc: not enough space for field" 2112 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 2113 i, src, src->name_length, src->value_length); 2114 2115 goto fail; 2116 } 2117 2118 nxt_unit_sptr_set(&f->name, p); 2119 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 2120 *p++ = '\0'; 2121 2122 nxt_unit_sptr_set(&f->value, p); 2123 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 2124 *p++ = '\0'; 2125 2126 f->hash = src->hash; 2127 f->skip = 0; 2128 f->name_length = src->name_length; 2129 f->value_length = src->value_length; 2130 2131 resp->fields_count++; 2132 f++; 2133 } 2134 2135 if (req->response->piggyback_content_length > 0) { 2136 if (nxt_slow_path(req->response->piggyback_content_length 2137 > (uint32_t) (buf->end - p))) 2138 { 2139 nxt_unit_req_warn(req, "realloc: not enought space for content" 2140 " #%"PRIu32", %"PRIu32" required", 2141 i, req->response->piggyback_content_length); 2142 2143 goto fail; 2144 } 2145 2146 resp->piggyback_content_length = 2147 req->response->piggyback_content_length; 2148 2149 nxt_unit_sptr_set(&resp->piggyback_content, p); 2150 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 2151 req->response->piggyback_content_length); 2152 } 2153 2154 buf->free = p; 2155 2156 nxt_unit_buf_free(req->response_buf); 2157 2158 req->response = resp; 2159 req->response_buf = buf; 2160 req->response_max_fields = max_fields_count; 2161 2162 return NXT_UNIT_OK; 2163 2164 fail: 2165 2166 nxt_unit_buf_free(buf); 2167 2168 return NXT_UNIT_ERROR; 2169 } 2170 2171 2172 int 2173 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 2174 { 2175 nxt_unit_request_info_impl_t *req_impl; 2176 2177 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2178 2179 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 2180 } 2181 2182 2183 int 2184 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 2185 const char *name, uint8_t name_length, 2186 const char *value, uint32_t value_length) 2187 { 2188 nxt_unit_buf_t *buf; 2189 nxt_unit_field_t *f; 2190 nxt_unit_response_t *resp; 2191 nxt_unit_request_info_impl_t *req_impl; 2192 2193 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2194 2195 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 2196 nxt_unit_req_warn(req, "add_field: response not initialized or " 2197 "already sent"); 2198 2199 return NXT_UNIT_ERROR; 2200 } 2201 2202 resp = req->response; 2203 2204 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 2205 nxt_unit_req_warn(req, "add_field: too many response fields (%d)", 2206 (int) resp->fields_count); 2207 2208 return NXT_UNIT_ERROR; 2209 } 2210 2211 buf = req->response_buf; 2212 2213 if (nxt_slow_path(name_length + value_length + 2 2214 > (uint32_t) (buf->end - buf->free))) 2215 { 2216 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 2217 2218 return NXT_UNIT_ERROR; 2219 } 2220 2221 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 2222 resp->fields_count, 2223 (int) name_length, name, 2224 (int) value_length, value); 2225 2226 f = resp->fields + resp->fields_count; 2227 2228 nxt_unit_sptr_set(&f->name, buf->free); 2229 buf->free = nxt_cpymem(buf->free, name, name_length); 2230 *buf->free++ = '\0'; 2231 2232 nxt_unit_sptr_set(&f->value, buf->free); 2233 buf->free = nxt_cpymem(buf->free, value, value_length); 2234 *buf->free++ = '\0'; 2235 2236 f->hash = nxt_unit_field_hash(name, name_length); 2237 f->skip = 0; 2238 f->name_length = name_length; 2239 f->value_length = value_length; 2240 2241 resp->fields_count++; 2242 2243 return NXT_UNIT_OK; 2244 } 2245 2246 2247 int 2248 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 2249 const void* src, uint32_t size) 2250 { 2251 nxt_unit_buf_t *buf; 2252 nxt_unit_response_t *resp; 2253 nxt_unit_request_info_impl_t *req_impl; 2254 2255 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2256 2257 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2258 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 2259 2260 return NXT_UNIT_ERROR; 2261 } 2262 2263 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2264 nxt_unit_req_warn(req, "add_content: response already sent"); 2265 2266 return NXT_UNIT_ERROR; 2267 } 2268 2269 buf = req->response_buf; 2270 2271 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 2272 nxt_unit_req_warn(req, "add_content: buffer overflow"); 2273 2274 return NXT_UNIT_ERROR; 2275 } 2276 2277 resp = req->response; 2278 2279 if (resp->piggyback_content_length == 0) { 2280 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 2281 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 2282 } 2283 2284 resp->piggyback_content_length += size; 2285 2286 buf->free = nxt_cpymem(buf->free, src, size); 2287 2288 return NXT_UNIT_OK; 2289 } 2290 2291 2292 int 2293 nxt_unit_response_send(nxt_unit_request_info_t *req) 2294 { 2295 int rc; 2296 nxt_unit_mmap_buf_t *mmap_buf; 2297 nxt_unit_request_info_impl_t *req_impl; 2298 2299 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2300 2301 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2302 nxt_unit_req_warn(req, "send: response is not initialized yet"); 2303 2304 return NXT_UNIT_ERROR; 2305 } 2306 2307 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2308 nxt_unit_req_warn(req, "send: response already sent"); 2309 2310 return NXT_UNIT_ERROR; 2311 } 2312 2313 if (req->request->websocket_handshake && req->response->status == 101) { 2314 nxt_unit_response_upgrade(req); 2315 } 2316 2317 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 2318 req->response->fields_count, 2319 (int) (req->response_buf->free 2320 - req->response_buf->start)); 2321 2322 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 2323 2324 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2325 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 2326 req->response = NULL; 2327 req->response_buf = NULL; 2328 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2329 2330 nxt_unit_mmap_buf_free(mmap_buf); 2331 } 2332 2333 return rc; 2334 } 2335 2336 2337 int 2338 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 2339 { 2340 nxt_unit_request_info_impl_t *req_impl; 2341 2342 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2343 2344 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 2345 } 2346 2347 2348 nxt_unit_buf_t * 2349 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 2350 { 2351 int rc; 2352 nxt_unit_mmap_buf_t *mmap_buf; 2353 nxt_unit_request_info_impl_t *req_impl; 2354 2355 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 2356 nxt_unit_req_warn(req, "response_buf_alloc: " 2357 "requested buffer (%"PRIu32") too big", size); 2358 2359 return NULL; 2360 } 2361 2362 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 2363 2364 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2365 2366 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 2367 if (nxt_slow_path(mmap_buf == NULL)) { 2368 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 2369 2370 return NULL; 2371 } 2372 2373 mmap_buf->req = req; 2374 2375 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 2376 2377 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2378 size, size, mmap_buf, 2379 NULL); 2380 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2381 nxt_unit_mmap_buf_release(mmap_buf); 2382 2383 nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf"); 2384 2385 return NULL; 2386 } 2387 2388 return &mmap_buf->buf; 2389 } 2390 2391 2392 static nxt_unit_mmap_buf_t * 2393 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 2394 { 2395 nxt_unit_mmap_buf_t *mmap_buf; 2396 nxt_unit_ctx_impl_t *ctx_impl; 2397 2398 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2399 2400 pthread_mutex_lock(&ctx_impl->mutex); 2401 2402 if (ctx_impl->free_buf == NULL) { 2403 pthread_mutex_unlock(&ctx_impl->mutex); 2404 2405 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t)); 2406 if (nxt_slow_path(mmap_buf == NULL)) { 2407 return NULL; 2408 } 2409 2410 } else { 2411 mmap_buf = ctx_impl->free_buf; 2412 2413 nxt_unit_mmap_buf_unlink(mmap_buf); 2414 2415 pthread_mutex_unlock(&ctx_impl->mutex); 2416 } 2417 2418 mmap_buf->ctx_impl = ctx_impl; 2419 2420 mmap_buf->hdr = NULL; 2421 mmap_buf->free_ptr = NULL; 2422 2423 return mmap_buf; 2424 } 2425 2426 2427 static void 2428 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 2429 { 2430 nxt_unit_mmap_buf_unlink(mmap_buf); 2431 2432 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 2433 2434 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 2435 2436 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 2437 } 2438 2439 2440 int 2441 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 2442 { 2443 return req->request->websocket_handshake; 2444 } 2445 2446 2447 int 2448 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 2449 { 2450 int rc; 2451 nxt_unit_request_info_impl_t *req_impl; 2452 2453 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2454 2455 if (nxt_slow_path(req_impl->websocket != 0)) { 2456 nxt_unit_req_debug(req, "upgrade: already upgraded"); 2457 2458 return NXT_UNIT_OK; 2459 } 2460 2461 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2462 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 2463 2464 return NXT_UNIT_ERROR; 2465 } 2466 2467 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2468 nxt_unit_req_warn(req, "upgrade: response already sent"); 2469 2470 return NXT_UNIT_ERROR; 2471 } 2472 2473 rc = nxt_unit_request_hash_add(req->ctx, req); 2474 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2475 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 2476 2477 return NXT_UNIT_ERROR; 2478 } 2479 2480 req_impl->websocket = 1; 2481 2482 req->response->status = 101; 2483 2484 return NXT_UNIT_OK; 2485 } 2486 2487 2488 int 2489 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 2490 { 2491 nxt_unit_request_info_impl_t *req_impl; 2492 2493 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2494 2495 return req_impl->websocket; 2496 } 2497 2498 2499 nxt_unit_request_info_t * 2500 nxt_unit_get_request_info_from_data(void *data) 2501 { 2502 nxt_unit_request_info_impl_t *req_impl; 2503 2504 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 2505 2506 return &req_impl->req; 2507 } 2508 2509 2510 int 2511 nxt_unit_buf_send(nxt_unit_buf_t *buf) 2512 { 2513 int rc; 2514 nxt_unit_mmap_buf_t *mmap_buf; 2515 nxt_unit_request_info_t *req; 2516 nxt_unit_request_info_impl_t *req_impl; 2517 2518 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2519 2520 req = mmap_buf->req; 2521 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2522 2523 nxt_unit_req_debug(req, "buf_send: %d bytes", 2524 (int) (buf->free - buf->start)); 2525 2526 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2527 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 2528 2529 return NXT_UNIT_ERROR; 2530 } 2531 2532 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2533 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 2534 2535 return NXT_UNIT_ERROR; 2536 } 2537 2538 if (nxt_fast_path(buf->free > buf->start)) { 2539 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2540 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2541 return rc; 2542 } 2543 } 2544 2545 nxt_unit_mmap_buf_free(mmap_buf); 2546 2547 return NXT_UNIT_OK; 2548 } 2549 2550 2551 static void 2552 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 2553 { 2554 int rc; 2555 nxt_unit_mmap_buf_t *mmap_buf; 2556 nxt_unit_request_info_t *req; 2557 2558 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2559 2560 req = mmap_buf->req; 2561 2562 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1); 2563 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 2564 nxt_unit_mmap_buf_free(mmap_buf); 2565 2566 nxt_unit_request_info_release(req); 2567 2568 } else { 2569 nxt_unit_request_done(req, rc); 2570 } 2571 } 2572 2573 2574 static int 2575 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 2576 nxt_unit_mmap_buf_t *mmap_buf, int last) 2577 { 2578 struct { 2579 nxt_port_msg_t msg; 2580 nxt_port_mmap_msg_t mmap_msg; 2581 } m; 2582 2583 int rc; 2584 u_char *last_used, *first_free; 2585 ssize_t res; 2586 nxt_chunk_id_t first_free_chunk; 2587 nxt_unit_buf_t *buf; 2588 nxt_unit_impl_t *lib; 2589 nxt_port_mmap_header_t *hdr; 2590 nxt_unit_request_info_impl_t *req_impl; 2591 2592 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 2593 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2594 2595 buf = &mmap_buf->buf; 2596 hdr = mmap_buf->hdr; 2597 2598 m.mmap_msg.size = buf->free - buf->start; 2599 2600 m.msg.stream = req_impl->stream; 2601 m.msg.pid = lib->pid; 2602 m.msg.reply_port = 0; 2603 m.msg.type = _NXT_PORT_MSG_DATA; 2604 m.msg.last = last != 0; 2605 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 2606 m.msg.nf = 0; 2607 m.msg.mf = 0; 2608 m.msg.tracking = 0; 2609 2610 rc = NXT_UNIT_ERROR; 2611 2612 if (m.msg.mmap) { 2613 m.mmap_msg.mmap_id = hdr->id; 2614 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, 2615 (u_char *) buf->start); 2616 2617 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 2618 req_impl->stream, 2619 (int) m.mmap_msg.mmap_id, 2620 (int) m.mmap_msg.chunk_id, 2621 (int) m.mmap_msg.size); 2622 2623 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), 2624 NULL, 0); 2625 if (nxt_slow_path(res != sizeof(m))) { 2626 goto free_buf; 2627 } 2628 2629 last_used = (u_char *) buf->free - 1; 2630 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 2631 2632 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { 2633 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 2634 2635 buf->start = (char *) first_free; 2636 buf->free = buf->start; 2637 2638 if (buf->end < buf->start) { 2639 buf->end = buf->start; 2640 } 2641 2642 } else { 2643 buf->start = NULL; 2644 buf->free = NULL; 2645 buf->end = NULL; 2646 2647 mmap_buf->hdr = NULL; 2648 } 2649 2650 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, 2651 (int) m.mmap_msg.chunk_id - (int) first_free_chunk); 2652 2653 nxt_unit_debug(req->ctx, "allocated_chunks %d", 2654 (int) lib->outgoing.allocated_chunks); 2655 2656 } else { 2657 if (nxt_slow_path(mmap_buf->plain_ptr == NULL 2658 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) 2659 { 2660 nxt_unit_alert(req->ctx, 2661 "#%"PRIu32": failed to send plain memory buffer" 2662 ": no space reserved for message header", 2663 req_impl->stream); 2664 2665 goto free_buf; 2666 } 2667 2668 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); 2669 2670 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", 2671 req_impl->stream, 2672 (int) (sizeof(m.msg) + m.mmap_msg.size)); 2673 2674 res = nxt_unit_port_send(req->ctx, req->response_port, 2675 buf->start - sizeof(m.msg), 2676 m.mmap_msg.size + sizeof(m.msg), 2677 NULL, 0); 2678 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { 2679 goto free_buf; 2680 } 2681 } 2682 2683 rc = NXT_UNIT_OK; 2684 2685 free_buf: 2686 2687 nxt_unit_free_outgoing_buf(mmap_buf); 2688 2689 return rc; 2690 } 2691 2692 2693 void 2694 nxt_unit_buf_free(nxt_unit_buf_t *buf) 2695 { 2696 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 2697 } 2698 2699 2700 static void 2701 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 2702 { 2703 nxt_unit_free_outgoing_buf(mmap_buf); 2704 2705 nxt_unit_mmap_buf_release(mmap_buf); 2706 } 2707 2708 2709 static void 2710 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) 2711 { 2712 if (mmap_buf->hdr != NULL) { 2713 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, 2714 mmap_buf->hdr, mmap_buf->buf.start, 2715 mmap_buf->buf.end - mmap_buf->buf.start); 2716 2717 mmap_buf->hdr = NULL; 2718 2719 return; 2720 } 2721 2722 if (mmap_buf->free_ptr != NULL) { 2723 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr); 2724 2725 mmap_buf->free_ptr = NULL; 2726 } 2727 } 2728 2729 2730 static nxt_unit_read_buf_t * 2731 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2732 { 2733 nxt_unit_ctx_impl_t *ctx_impl; 2734 nxt_unit_read_buf_t *rbuf; 2735 2736 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2737 2738 pthread_mutex_lock(&ctx_impl->mutex); 2739 2740 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); 2741 2742 pthread_mutex_unlock(&ctx_impl->mutex); 2743 2744 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 2745 2746 return rbuf; 2747 } 2748 2749 2750 static nxt_unit_read_buf_t * 2751 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2752 { 2753 nxt_queue_link_t *link; 2754 nxt_unit_read_buf_t *rbuf; 2755 2756 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { 2757 link = nxt_queue_first(&ctx_impl->free_rbuf); 2758 nxt_queue_remove(link); 2759 2760 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); 2761 2762 return rbuf; 2763 } 2764 2765 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t)); 2766 2767 if (nxt_fast_path(rbuf != NULL)) { 2768 rbuf->ctx_impl = ctx_impl; 2769 } 2770 2771 return rbuf; 2772 } 2773 2774 2775 static void 2776 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2777 nxt_unit_read_buf_t *rbuf) 2778 { 2779 nxt_unit_ctx_impl_t *ctx_impl; 2780 2781 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2782 2783 pthread_mutex_lock(&ctx_impl->mutex); 2784 2785 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); 2786 2787 pthread_mutex_unlock(&ctx_impl->mutex); 2788 } 2789 2790 2791 nxt_unit_buf_t * 2792 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2793 { 2794 nxt_unit_mmap_buf_t *mmap_buf; 2795 2796 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2797 2798 if (mmap_buf->next == NULL) { 2799 return NULL; 2800 } 2801 2802 return &mmap_buf->next->buf; 2803 } 2804 2805 2806 uint32_t 2807 nxt_unit_buf_max(void) 2808 { 2809 return PORT_MMAP_DATA_SIZE; 2810 } 2811 2812 2813 uint32_t 2814 nxt_unit_buf_min(void) 2815 { 2816 return PORT_MMAP_CHUNK_SIZE; 2817 } 2818 2819 2820 int 2821 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2822 size_t size) 2823 { 2824 ssize_t res; 2825 2826 res = nxt_unit_response_write_nb(req, start, size, size); 2827 2828 return res < 0 ? -res : NXT_UNIT_OK; 2829 } 2830 2831 2832 ssize_t 2833 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, 2834 size_t size, size_t min_size) 2835 { 2836 int rc; 2837 ssize_t sent; 2838 uint32_t part_size, min_part_size, buf_size; 2839 const char *part_start; 2840 nxt_unit_mmap_buf_t mmap_buf; 2841 nxt_unit_request_info_impl_t *req_impl; 2842 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2843 2844 nxt_unit_req_debug(req, "write: %d", (int) size); 2845 2846 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2847 2848 part_start = start; 2849 sent = 0; 2850 2851 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2852 nxt_unit_req_alert(req, "write: response not initialized yet"); 2853 2854 return -NXT_UNIT_ERROR; 2855 } 2856 2857 /* Check if response is not send yet. */ 2858 if (nxt_slow_path(req->response_buf != NULL)) { 2859 part_size = req->response_buf->end - req->response_buf->free; 2860 part_size = nxt_min(size, part_size); 2861 2862 rc = nxt_unit_response_add_content(req, part_start, part_size); 2863 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2864 return -rc; 2865 } 2866 2867 rc = nxt_unit_response_send(req); 2868 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2869 return -rc; 2870 } 2871 2872 size -= part_size; 2873 part_start += part_size; 2874 sent += part_size; 2875 2876 min_size -= nxt_min(min_size, part_size); 2877 } 2878 2879 while (size > 0) { 2880 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2881 min_part_size = nxt_min(min_size, part_size); 2882 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); 2883 2884 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, 2885 min_part_size, &mmap_buf, local_buf); 2886 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2887 return -rc; 2888 } 2889 2890 buf_size = mmap_buf.buf.end - mmap_buf.buf.free; 2891 if (nxt_slow_path(buf_size == 0)) { 2892 return sent; 2893 } 2894 part_size = nxt_min(buf_size, part_size); 2895 2896 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2897 part_start, part_size); 2898 2899 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2900 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2901 return -rc; 2902 } 2903 2904 size -= part_size; 2905 part_start += part_size; 2906 sent += part_size; 2907 2908 min_size -= nxt_min(min_size, part_size); 2909 } 2910 2911 return sent; 2912 } 2913 2914 2915 int 2916 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2917 nxt_unit_read_info_t *read_info) 2918 { 2919 int rc; 2920 ssize_t n; 2921 uint32_t buf_size; 2922 nxt_unit_buf_t *buf; 2923 nxt_unit_mmap_buf_t mmap_buf; 2924 nxt_unit_request_info_impl_t *req_impl; 2925 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2926 2927 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2928 2929 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2930 nxt_unit_req_alert(req, "write: response not initialized yet"); 2931 2932 return NXT_UNIT_ERROR; 2933 } 2934 2935 /* Check if response is not send yet. */ 2936 if (nxt_slow_path(req->response_buf != NULL)) { 2937 2938 /* Enable content in headers buf. */ 2939 rc = nxt_unit_response_add_content(req, "", 0); 2940 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2941 nxt_unit_req_error(req, "Failed to add piggyback content"); 2942 2943 return rc; 2944 } 2945 2946 buf = req->response_buf; 2947 2948 while (buf->end - buf->free > 0) { 2949 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2950 if (nxt_slow_path(n < 0)) { 2951 nxt_unit_req_error(req, "Read error"); 2952 2953 return NXT_UNIT_ERROR; 2954 } 2955 2956 /* Manually increase sizes. */ 2957 buf->free += n; 2958 req->response->piggyback_content_length += n; 2959 2960 if (read_info->eof) { 2961 break; 2962 } 2963 } 2964 2965 rc = nxt_unit_response_send(req); 2966 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2967 nxt_unit_req_error(req, "Failed to send headers with content"); 2968 2969 return rc; 2970 } 2971 2972 if (read_info->eof) { 2973 return NXT_UNIT_OK; 2974 } 2975 } 2976 2977 while (!read_info->eof) { 2978 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 2979 read_info->buf_size); 2980 2981 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); 2982 2983 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2984 buf_size, buf_size, 2985 &mmap_buf, local_buf); 2986 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2987 return rc; 2988 } 2989 2990 buf = &mmap_buf.buf; 2991 2992 while (!read_info->eof && buf->end > buf->free) { 2993 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2994 if (nxt_slow_path(n < 0)) { 2995 nxt_unit_req_error(req, "Read error"); 2996 2997 nxt_unit_free_outgoing_buf(&mmap_buf); 2998 2999 return NXT_UNIT_ERROR; 3000 } 3001 3002 buf->free += n; 3003 } 3004 3005 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3006 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3007 nxt_unit_req_error(req, "Failed to send content"); 3008 3009 return rc; 3010 } 3011 } 3012 3013 return NXT_UNIT_OK; 3014 } 3015 3016 3017 ssize_t 3018 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 3019 { 3020 ssize_t buf_res, res; 3021 3022 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, 3023 dst, size); 3024 3025 if (buf_res < (ssize_t) size && req->content_fd != -1) { 3026 res = read(req->content_fd, dst, size); 3027 if (nxt_slow_path(res < 0)) { 3028 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 3029 strerror(errno), errno); 3030 3031 return res; 3032 } 3033 3034 if (res < (ssize_t) size) { 3035 nxt_unit_close(req->content_fd); 3036 3037 req->content_fd = -1; 3038 } 3039 3040 req->content_length -= res; 3041 size -= res; 3042 3043 dst = nxt_pointer_to(dst, res); 3044 3045 } else { 3046 res = 0; 3047 } 3048 3049 return buf_res + res; 3050 } 3051 3052 3053 ssize_t 3054 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size) 3055 { 3056 char *p; 3057 size_t l_size, b_size; 3058 nxt_unit_buf_t *b; 3059 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf; 3060 3061 if (req->content_length == 0) { 3062 return 0; 3063 } 3064 3065 l_size = 0; 3066 3067 b = req->content_buf; 3068 3069 while (b != NULL) { 3070 b_size = b->end - b->free; 3071 p = memchr(b->free, '\n', b_size); 3072 3073 if (p != NULL) { 3074 p++; 3075 l_size += p - b->free; 3076 break; 3077 } 3078 3079 l_size += b_size; 3080 3081 if (max_size <= l_size) { 3082 break; 3083 } 3084 3085 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf); 3086 if (mmap_buf->next == NULL 3087 && req->content_fd != -1 3088 && l_size < req->content_length) 3089 { 3090 preread_buf = nxt_unit_request_preread(req, 16384); 3091 if (nxt_slow_path(preread_buf == NULL)) { 3092 return -1; 3093 } 3094 3095 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf); 3096 } 3097 3098 b = nxt_unit_buf_next(b); 3099 } 3100 3101 return nxt_min(max_size, l_size); 3102 } 3103 3104 3105 static nxt_unit_mmap_buf_t * 3106 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) 3107 { 3108 ssize_t res; 3109 nxt_unit_mmap_buf_t *mmap_buf; 3110 3111 if (req->content_fd == -1) { 3112 nxt_unit_req_alert(req, "preread: content_fd == -1"); 3113 return NULL; 3114 } 3115 3116 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 3117 if (nxt_slow_path(mmap_buf == NULL)) { 3118 nxt_unit_req_alert(req, "preread: failed to allocate buf"); 3119 return NULL; 3120 } 3121 3122 mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size); 3123 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3124 nxt_unit_req_alert(req, "preread: failed to allocate buf memory"); 3125 nxt_unit_mmap_buf_release(mmap_buf); 3126 return NULL; 3127 } 3128 3129 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3130 3131 mmap_buf->hdr = NULL; 3132 mmap_buf->buf.start = mmap_buf->free_ptr; 3133 mmap_buf->buf.free = mmap_buf->buf.start; 3134 mmap_buf->buf.end = mmap_buf->buf.start + size; 3135 3136 res = read(req->content_fd, mmap_buf->free_ptr, size); 3137 if (res < 0) { 3138 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 3139 strerror(errno), errno); 3140 3141 nxt_unit_mmap_buf_free(mmap_buf); 3142 3143 return NULL; 3144 } 3145 3146 if (res < (ssize_t) size) { 3147 nxt_unit_close(req->content_fd); 3148 3149 req->content_fd = -1; 3150 } 3151 3152 nxt_unit_req_debug(req, "preread: read %d", (int) res); 3153 3154 mmap_buf->buf.end = mmap_buf->buf.free + res; 3155 3156 return mmap_buf; 3157 } 3158 3159 3160 static ssize_t 3161 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) 3162 { 3163 u_char *p; 3164 size_t rest, copy, read; 3165 nxt_unit_buf_t *buf, *last_buf; 3166 3167 p = dst; 3168 rest = size; 3169 3170 buf = *b; 3171 last_buf = buf; 3172 3173 while (buf != NULL) { 3174 last_buf = buf; 3175 3176 copy = buf->end - buf->free; 3177 copy = nxt_min(rest, copy); 3178 3179 p = nxt_cpymem(p, buf->free, copy); 3180 3181 buf->free += copy; 3182 rest -= copy; 3183 3184 if (rest == 0) { 3185 if (buf->end == buf->free) { 3186 buf = nxt_unit_buf_next(buf); 3187 } 3188 3189 break; 3190 } 3191 3192 buf = nxt_unit_buf_next(buf); 3193 } 3194 3195 *b = last_buf; 3196 3197 read = size - rest; 3198 3199 *len -= read; 3200 3201 return read; 3202 } 3203 3204 3205 void 3206 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 3207 { 3208 uint32_t size; 3209 nxt_port_msg_t msg; 3210 nxt_unit_impl_t *lib; 3211 nxt_unit_request_info_impl_t *req_impl; 3212 3213 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 3214 3215 nxt_unit_req_debug(req, "done: %d", rc); 3216 3217 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3218 goto skip_response_send; 3219 } 3220 3221 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 3222 3223 size = nxt_length("Content-Type") + nxt_length("text/plain"); 3224 3225 rc = nxt_unit_response_init(req, 200, 1, size); 3226 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3227 goto skip_response_send; 3228 } 3229 3230 rc = nxt_unit_response_add_field(req, "Content-Type", 3231 nxt_length("Content-Type"), 3232 "text/plain", nxt_length("text/plain")); 3233 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3234 goto skip_response_send; 3235 } 3236 } 3237 3238 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 3239 3240 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 3241 3242 nxt_unit_buf_send_done(req->response_buf); 3243 3244 return; 3245 } 3246 3247 skip_response_send: 3248 3249 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 3250 3251 msg.stream = req_impl->stream; 3252 msg.pid = lib->pid; 3253 msg.reply_port = 0; 3254 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 3255 : _NXT_PORT_MSG_RPC_ERROR; 3256 msg.last = 1; 3257 msg.mmap = 0; 3258 msg.nf = 0; 3259 msg.mf = 0; 3260 msg.tracking = 0; 3261 3262 (void) nxt_unit_port_send(req->ctx, req->response_port, 3263 &msg, sizeof(msg), NULL, 0); 3264 3265 nxt_unit_request_info_release(req); 3266 } 3267 3268 3269 int 3270 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 3271 uint8_t last, const void *start, size_t size) 3272 { 3273 const struct iovec iov = { (void *) start, size }; 3274 3275 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); 3276 } 3277 3278 3279 int 3280 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 3281 uint8_t last, const struct iovec *iov, int iovcnt) 3282 { 3283 int i, rc; 3284 size_t l, copy; 3285 uint32_t payload_len, buf_size, alloc_size; 3286 const uint8_t *b; 3287 nxt_unit_buf_t *buf; 3288 nxt_unit_mmap_buf_t mmap_buf; 3289 nxt_websocket_header_t *wh; 3290 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 3291 3292 payload_len = 0; 3293 3294 for (i = 0; i < iovcnt; i++) { 3295 payload_len += iov[i].iov_len; 3296 } 3297 3298 buf_size = 10 + payload_len; 3299 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 3300 3301 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3302 alloc_size, alloc_size, 3303 &mmap_buf, local_buf); 3304 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3305 return rc; 3306 } 3307 3308 buf = &mmap_buf.buf; 3309 3310 buf->start[0] = 0; 3311 buf->start[1] = 0; 3312 3313 buf_size -= buf->end - buf->start; 3314 3315 wh = (void *) buf->free; 3316 3317 buf->free = nxt_websocket_frame_init(wh, payload_len); 3318 wh->fin = last; 3319 wh->opcode = opcode; 3320 3321 for (i = 0; i < iovcnt; i++) { 3322 b = iov[i].iov_base; 3323 l = iov[i].iov_len; 3324 3325 while (l > 0) { 3326 copy = buf->end - buf->free; 3327 copy = nxt_min(l, copy); 3328 3329 buf->free = nxt_cpymem(buf->free, b, copy); 3330 b += copy; 3331 l -= copy; 3332 3333 if (l > 0) { 3334 if (nxt_fast_path(buf->free > buf->start)) { 3335 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3336 3337 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3338 return rc; 3339 } 3340 } 3341 3342 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 3343 3344 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3345 alloc_size, alloc_size, 3346 &mmap_buf, local_buf); 3347 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3348 return rc; 3349 } 3350 3351 buf_size -= buf->end - buf->start; 3352 } 3353 } 3354 } 3355 3356 if (buf->free > buf->start) { 3357 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3358 } 3359 3360 return rc; 3361 } 3362 3363 3364 ssize_t 3365 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 3366 size_t size) 3367 { 3368 ssize_t res; 3369 uint8_t *b; 3370 uint64_t i, d; 3371 3372 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, 3373 dst, size); 3374 3375 if (ws->mask == NULL) { 3376 return res; 3377 } 3378 3379 b = dst; 3380 d = (ws->payload_len - ws->content_length - res) % 4; 3381 3382 for (i = 0; i < (uint64_t) res; i++) { 3383 b[i] ^= ws->mask[ (i + d) % 4 ]; 3384 } 3385 3386 return res; 3387 } 3388 3389 3390 int 3391 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) 3392 { 3393 char *b; 3394 size_t size, hsize; 3395 nxt_unit_websocket_frame_impl_t *ws_impl; 3396 3397 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 3398 3399 if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) { 3400 return NXT_UNIT_OK; 3401 } 3402 3403 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; 3404 3405 b = nxt_unit_malloc(ws->req->ctx, size); 3406 if (nxt_slow_path(b == NULL)) { 3407 return NXT_UNIT_ERROR; 3408 } 3409 3410 memcpy(b, ws_impl->buf->buf.start, size); 3411 3412 hsize = nxt_websocket_frame_header_size(b); 3413 3414 ws_impl->buf->buf.start = b; 3415 ws_impl->buf->buf.free = b + hsize; 3416 ws_impl->buf->buf.end = b + size; 3417 3418 ws_impl->buf->free_ptr = b; 3419 3420 ws_impl->ws.header = (nxt_websocket_header_t *) b; 3421 3422 if (ws_impl->ws.header->mask) { 3423 ws_impl->ws.mask = (uint8_t *) b + hsize - 4; 3424 3425 } else { 3426 ws_impl->ws.mask = NULL; 3427 } 3428 3429 return NXT_UNIT_OK; 3430 } 3431 3432 3433 void 3434 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) 3435 { 3436 nxt_unit_websocket_frame_release(ws); 3437 } 3438 3439 3440 static nxt_port_mmap_header_t * 3441 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 3442 nxt_chunk_id_t *c, int *n, int min_n) 3443 { 3444 int res, nchunks, i; 3445 uint32_t outgoing_size; 3446 nxt_unit_mmap_t *mm, *mm_end; 3447 nxt_unit_impl_t *lib; 3448 nxt_port_mmap_header_t *hdr; 3449 3450 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3451 3452 pthread_mutex_lock(&lib->outgoing.mutex); 3453 3454 retry: 3455 3456 outgoing_size = lib->outgoing.size; 3457 3458 mm_end = lib->outgoing.elts + outgoing_size; 3459 3460 for (mm = lib->outgoing.elts; mm < mm_end; mm++) { 3461 hdr = mm->hdr; 3462 3463 if (hdr->sent_over != 0xFFFFu 3464 && (hdr->sent_over != port->id.id 3465 || mm->src_thread != pthread_self())) 3466 { 3467 continue; 3468 } 3469 3470 *c = 0; 3471 3472 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { 3473 nchunks = 1; 3474 3475 while (nchunks < *n) { 3476 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, 3477 *c + nchunks); 3478 3479 if (res == 0) { 3480 if (nchunks >= min_n) { 3481 *n = nchunks; 3482 3483 goto unlock; 3484 } 3485 3486 for (i = 0; i < nchunks; i++) { 3487 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); 3488 } 3489 3490 *c += nchunks + 1; 3491 nchunks = 0; 3492 break; 3493 } 3494 3495 nchunks++; 3496 } 3497 3498 if (nchunks >= min_n) { 3499 *n = nchunks; 3500 3501 goto unlock; 3502 } 3503 } 3504 3505 hdr->oosm = 1; 3506 } 3507 3508 if (outgoing_size >= lib->shm_mmap_limit) { 3509 /* Cannot allocate more shared memory. */ 3510 pthread_mutex_unlock(&lib->outgoing.mutex); 3511 3512 if (min_n == 0) { 3513 *n = 0; 3514 } 3515 3516 if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n 3517 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) 3518 { 3519 /* Memory allocated by application, but not send to router. */ 3520 return NULL; 3521 } 3522 3523 /* Notify router about OOSM condition. */ 3524 3525 res = nxt_unit_send_oosm(ctx, port); 3526 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3527 return NULL; 3528 } 3529 3530 /* Return if caller can handle OOSM condition. Non-blocking mode. */ 3531 3532 if (min_n == 0) { 3533 return NULL; 3534 } 3535 3536 nxt_unit_debug(ctx, "oosm: waiting for ACK"); 3537 3538 res = nxt_unit_wait_shm_ack(ctx); 3539 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3540 return NULL; 3541 } 3542 3543 nxt_unit_debug(ctx, "oosm: retry"); 3544 3545 pthread_mutex_lock(&lib->outgoing.mutex); 3546 3547 goto retry; 3548 } 3549 3550 *c = 0; 3551 hdr = nxt_unit_new_mmap(ctx, port, *n); 3552 3553 unlock: 3554 3555 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n); 3556 3557 nxt_unit_debug(ctx, "allocated_chunks %d", 3558 (int) lib->outgoing.allocated_chunks); 3559 3560 pthread_mutex_unlock(&lib->outgoing.mutex); 3561 3562 return hdr; 3563 } 3564 3565 3566 static int 3567 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 3568 { 3569 ssize_t res; 3570 nxt_port_msg_t msg; 3571 nxt_unit_impl_t *lib; 3572 3573 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3574 3575 msg.stream = 0; 3576 msg.pid = lib->pid; 3577 msg.reply_port = 0; 3578 msg.type = _NXT_PORT_MSG_OOSM; 3579 msg.last = 0; 3580 msg.mmap = 0; 3581 msg.nf = 0; 3582 msg.mf = 0; 3583 msg.tracking = 0; 3584 3585 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 3586 if (nxt_slow_path(res != sizeof(msg))) { 3587 return NXT_UNIT_ERROR; 3588 } 3589 3590 return NXT_UNIT_OK; 3591 } 3592 3593 3594 static int 3595 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) 3596 { 3597 int res; 3598 nxt_unit_ctx_impl_t *ctx_impl; 3599 nxt_unit_read_buf_t *rbuf; 3600 3601 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3602 3603 while (1) { 3604 rbuf = nxt_unit_read_buf_get(ctx); 3605 if (nxt_slow_path(rbuf == NULL)) { 3606 return NXT_UNIT_ERROR; 3607 } 3608 3609 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 3610 if (res == NXT_UNIT_ERROR) { 3611 nxt_unit_read_buf_release(ctx, rbuf); 3612 3613 return NXT_UNIT_ERROR; 3614 } 3615 3616 if (nxt_unit_is_shm_ack(rbuf)) { 3617 nxt_unit_read_buf_release(ctx, rbuf); 3618 break; 3619 } 3620 3621 pthread_mutex_lock(&ctx_impl->mutex); 3622 3623 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link); 3624 3625 pthread_mutex_unlock(&ctx_impl->mutex); 3626 3627 if (nxt_unit_is_quit(rbuf)) { 3628 nxt_unit_debug(ctx, "oosm: quit received"); 3629 3630 return NXT_UNIT_ERROR; 3631 } 3632 } 3633 3634 return NXT_UNIT_OK; 3635 } 3636 3637 3638 static nxt_unit_mmap_t * 3639 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 3640 { 3641 uint32_t cap, n; 3642 nxt_unit_mmap_t *e; 3643 3644 if (nxt_fast_path(mmaps->size > i)) { 3645 return mmaps->elts + i; 3646 } 3647 3648 cap = mmaps->cap; 3649 3650 if (cap == 0) { 3651 cap = i + 1; 3652 } 3653 3654 while (i + 1 > cap) { 3655 3656 if (cap < 16) { 3657 cap = cap * 2; 3658 3659 } else { 3660 cap = cap + cap / 2; 3661 } 3662 } 3663 3664 if (cap != mmaps->cap) { 3665 3666 e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t)); 3667 if (nxt_slow_path(e == NULL)) { 3668 return NULL; 3669 } 3670 3671 mmaps->elts = e; 3672 3673 for (n = mmaps->cap; n < cap; n++) { 3674 e = mmaps->elts + n; 3675 3676 e->hdr = NULL; 3677 nxt_queue_init(&e->awaiting_rbuf); 3678 } 3679 3680 mmaps->cap = cap; 3681 } 3682 3683 if (i + 1 > mmaps->size) { 3684 mmaps->size = i + 1; 3685 } 3686 3687 return mmaps->elts + i; 3688 } 3689 3690 3691 static nxt_port_mmap_header_t * 3692 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) 3693 { 3694 int i, fd, rc; 3695 void *mem; 3696 nxt_unit_mmap_t *mm; 3697 nxt_unit_impl_t *lib; 3698 nxt_port_mmap_header_t *hdr; 3699 3700 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3701 3702 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size); 3703 if (nxt_slow_path(mm == NULL)) { 3704 nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); 3705 3706 return NULL; 3707 } 3708 3709 fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE); 3710 if (nxt_slow_path(fd == -1)) { 3711 goto remove_fail; 3712 } 3713 3714 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 3715 if (nxt_slow_path(mem == MAP_FAILED)) { 3716 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, 3717 strerror(errno), errno); 3718 3719 nxt_unit_close(fd); 3720 3721 goto remove_fail; 3722 } 3723 3724 mm->hdr = mem; 3725 hdr = mem; 3726 3727 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 3728 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 3729 3730 hdr->id = lib->outgoing.size - 1; 3731 hdr->src_pid = lib->pid; 3732 hdr->dst_pid = port->id.pid; 3733 hdr->sent_over = port->id.id; 3734 mm->src_thread = pthread_self(); 3735 3736 /* Mark first n chunk(s) as busy */ 3737 for (i = 0; i < n; i++) { 3738 nxt_port_mmap_set_chunk_busy(hdr->free_map, i); 3739 } 3740 3741 /* Mark as busy chunk followed the last available chunk. */ 3742 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 3743 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 3744 3745 pthread_mutex_unlock(&lib->outgoing.mutex); 3746 3747 rc = nxt_unit_send_mmap(ctx, port, fd); 3748 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3749 munmap(mem, PORT_MMAP_SIZE); 3750 hdr = NULL; 3751 3752 } else { 3753 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", 3754 hdr->id, (int) lib->pid, (int) port->id.pid); 3755 } 3756 3757 nxt_unit_close(fd); 3758 3759 pthread_mutex_lock(&lib->outgoing.mutex); 3760 3761 if (nxt_fast_path(hdr != NULL)) { 3762 return hdr; 3763 } 3764 3765 remove_fail: 3766 3767 lib->outgoing.size--; 3768 3769 return NULL; 3770 } 3771 3772 3773 static int 3774 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) 3775 { 3776 int fd; 3777 nxt_unit_impl_t *lib; 3778 3779 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3780 3781 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) 3782 char name[64]; 3783 3784 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", 3785 lib->pid, (void *) pthread_self()); 3786 #endif 3787 3788 #if (NXT_HAVE_MEMFD_CREATE) 3789 3790 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 3791 if (nxt_slow_path(fd == -1)) { 3792 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, 3793 strerror(errno), errno); 3794 3795 return -1; 3796 } 3797 3798 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); 3799 3800 #elif (NXT_HAVE_SHM_OPEN_ANON) 3801 3802 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 3803 if (nxt_slow_path(fd == -1)) { 3804 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", 3805 strerror(errno), errno); 3806 3807 return -1; 3808 } 3809 3810 #elif (NXT_HAVE_SHM_OPEN) 3811 3812 /* Just in case. */ 3813 shm_unlink(name); 3814 3815 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 3816 if (nxt_slow_path(fd == -1)) { 3817 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, 3818 strerror(errno), errno); 3819 3820 return -1; 3821 } 3822 3823 if (nxt_slow_path(shm_unlink(name) == -1)) { 3824 nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name, 3825 strerror(errno), errno); 3826 } 3827 3828 #else 3829 3830 #error No working shared memory implementation. 3831 3832 #endif 3833 3834 if (nxt_slow_path(ftruncate(fd, size) == -1)) { 3835 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, 3836 strerror(errno), errno); 3837 3838 nxt_unit_close(fd); 3839 3840 return -1; 3841 } 3842 3843 return fd; 3844 } 3845 3846 3847 static int 3848 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) 3849 { 3850 ssize_t res; 3851 nxt_port_msg_t msg; 3852 nxt_unit_impl_t *lib; 3853 union { 3854 struct cmsghdr cm; 3855 char space[CMSG_SPACE(sizeof(int))]; 3856 } cmsg; 3857 3858 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3859 3860 msg.stream = 0; 3861 msg.pid = lib->pid; 3862 msg.reply_port = 0; 3863 msg.type = _NXT_PORT_MSG_MMAP; 3864 msg.last = 0; 3865 msg.mmap = 0; 3866 msg.nf = 0; 3867 msg.mf = 0; 3868 msg.tracking = 0; 3869 3870 /* 3871 * Fill all padding fields with 0. 3872 * Code in Go 1.11 validate cmsghdr using padding field as part of len. 3873 * See Cmsghdr definition and socketControlMessageHeaderAndData function. 3874 */ 3875 memset(&cmsg, 0, sizeof(cmsg)); 3876 3877 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 3878 cmsg.cm.cmsg_level = SOL_SOCKET; 3879 cmsg.cm.cmsg_type = SCM_RIGHTS; 3880 3881 /* 3882 * memcpy() is used instead of simple 3883 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 3884 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 3885 * dereferencing type-punned pointer will break strict-aliasing rules 3886 * 3887 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3888 * in the same simple assignment as in the code above. 3889 */ 3890 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3891 3892 res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), 3893 &cmsg, sizeof(cmsg)); 3894 if (nxt_slow_path(res != sizeof(msg))) { 3895 return NXT_UNIT_ERROR; 3896 } 3897 3898 return NXT_UNIT_OK; 3899 } 3900 3901 3902 static int 3903 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 3904 uint32_t size, uint32_t min_size, 3905 nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) 3906 { 3907 int nchunks, min_nchunks; 3908 nxt_chunk_id_t c; 3909 nxt_port_mmap_header_t *hdr; 3910 3911 if (size <= NXT_UNIT_MAX_PLAIN_SIZE) { 3912 if (local_buf != NULL) { 3913 mmap_buf->free_ptr = NULL; 3914 mmap_buf->plain_ptr = local_buf; 3915 3916 } else { 3917 mmap_buf->free_ptr = nxt_unit_malloc(ctx, 3918 size + sizeof(nxt_port_msg_t)); 3919 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3920 return NXT_UNIT_ERROR; 3921 } 3922 3923 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3924 } 3925 3926 mmap_buf->hdr = NULL; 3927 mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); 3928 mmap_buf->buf.free = mmap_buf->buf.start; 3929 mmap_buf->buf.end = mmap_buf->buf.start + size; 3930 3931 nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", 3932 mmap_buf->buf.start, (int) size); 3933 3934 return NXT_UNIT_OK; 3935 } 3936 3937 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3938 min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3939 3940 hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks); 3941 if (nxt_slow_path(hdr == NULL)) { 3942 if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { 3943 mmap_buf->hdr = NULL; 3944 mmap_buf->buf.start = NULL; 3945 mmap_buf->buf.free = NULL; 3946 mmap_buf->buf.end = NULL; 3947 mmap_buf->free_ptr = NULL; 3948 3949 return NXT_UNIT_OK; 3950 } 3951 3952 return NXT_UNIT_ERROR; 3953 } 3954 3955 mmap_buf->hdr = hdr; 3956 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); 3957 mmap_buf->buf.free = mmap_buf->buf.start; 3958 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; 3959 mmap_buf->free_ptr = NULL; 3960 mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3961 3962 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", 3963 (int) hdr->id, (int) c, 3964 (int) (nchunks * PORT_MMAP_CHUNK_SIZE)); 3965 3966 return NXT_UNIT_OK; 3967 } 3968 3969 3970 static int 3971 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 3972 { 3973 int rc; 3974 void *mem; 3975 nxt_queue_t awaiting_rbuf; 3976 struct stat mmap_stat; 3977 nxt_unit_mmap_t *mm; 3978 nxt_unit_impl_t *lib; 3979 nxt_unit_ctx_impl_t *ctx_impl; 3980 nxt_unit_read_buf_t *rbuf; 3981 nxt_port_mmap_header_t *hdr; 3982 3983 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3984 3985 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 3986 3987 if (fstat(fd, &mmap_stat) == -1) { 3988 nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, 3989 strerror(errno), errno); 3990 3991 return NXT_UNIT_ERROR; 3992 } 3993 3994 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, 3995 MAP_SHARED, fd, 0); 3996 if (nxt_slow_path(mem == MAP_FAILED)) { 3997 nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)", 3998 strerror(errno), errno); 3999 4000 return NXT_UNIT_ERROR; 4001 } 4002 4003 hdr = mem; 4004 4005 if (nxt_slow_path(hdr->src_pid != pid)) { 4006 4007 nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header " 4008 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 4009 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 4010 4011 munmap(mem, PORT_MMAP_SIZE); 4012 4013 return NXT_UNIT_ERROR; 4014 } 4015 4016 nxt_queue_init(&awaiting_rbuf); 4017 4018 pthread_mutex_lock(&lib->incoming.mutex); 4019 4020 mm = nxt_unit_mmap_at(&lib->incoming, hdr->id); 4021 if (nxt_slow_path(mm == NULL)) { 4022 nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array"); 4023 4024 munmap(mem, PORT_MMAP_SIZE); 4025 4026 rc = NXT_UNIT_ERROR; 4027 4028 } else { 4029 mm->hdr = hdr; 4030 4031 hdr->sent_over = 0xFFFFu; 4032 4033 nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf); 4034 nxt_queue_init(&mm->awaiting_rbuf); 4035 4036 rc = NXT_UNIT_OK; 4037 } 4038 4039 pthread_mutex_unlock(&lib->incoming.mutex); 4040 4041 nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { 4042 4043 ctx_impl = rbuf->ctx_impl; 4044 4045 pthread_mutex_lock(&ctx_impl->mutex); 4046 4047 nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link); 4048 4049 pthread_mutex_unlock(&ctx_impl->mutex); 4050 4051 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); 4052 4053 nxt_unit_awake_ctx(ctx, ctx_impl); 4054 4055 } nxt_queue_loop; 4056 4057 return rc; 4058 } 4059 4060 4061 static void 4062 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl) 4063 { 4064 nxt_port_msg_t msg; 4065 4066 if (nxt_fast_path(ctx == &ctx_impl->ctx)) { 4067 return; 4068 } 4069 4070 if (nxt_slow_path(ctx_impl->read_port == NULL 4071 || ctx_impl->read_port->out_fd == -1)) 4072 { 4073 nxt_unit_alert(ctx, "target context read_port is NULL or not writable"); 4074 4075 return; 4076 } 4077 4078 memset(&msg, 0, sizeof(nxt_port_msg_t)); 4079 4080 msg.type = _NXT_PORT_MSG_RPC_READY; 4081 4082 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, 4083 &msg, sizeof(msg), NULL, 0); 4084 } 4085 4086 4087 static void 4088 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) 4089 { 4090 pthread_mutex_init(&mmaps->mutex, NULL); 4091 4092 mmaps->size = 0; 4093 mmaps->cap = 0; 4094 mmaps->elts = NULL; 4095 mmaps->allocated_chunks = 0; 4096 } 4097 4098 4099 nxt_inline void 4100 nxt_unit_process_use(nxt_unit_process_t *process) 4101 { 4102 nxt_atomic_fetch_add(&process->use_count, 1); 4103 } 4104 4105 4106 nxt_inline void 4107 nxt_unit_process_release(nxt_unit_process_t *process) 4108 { 4109 long c; 4110 4111 c = nxt_atomic_fetch_add(&process->use_count, -1); 4112 4113 if (c == 1) { 4114 nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); 4115 4116 nxt_unit_free(NULL, process); 4117 } 4118 } 4119 4120 4121 static void 4122 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) 4123 { 4124 nxt_unit_mmap_t *mm, *end; 4125 4126 if (mmaps->elts != NULL) { 4127 end = mmaps->elts + mmaps->size; 4128 4129 for (mm = mmaps->elts; mm < end; mm++) { 4130 munmap(mm->hdr, PORT_MMAP_SIZE); 4131 } 4132 4133 nxt_unit_free(NULL, mmaps->elts); 4134 } 4135 4136 pthread_mutex_destroy(&mmaps->mutex); 4137 } 4138 4139 4140 static int 4141 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, 4142 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, 4143 nxt_unit_read_buf_t *rbuf) 4144 { 4145 int res, need_rbuf; 4146 nxt_unit_mmap_t *mm; 4147 nxt_unit_ctx_impl_t *ctx_impl; 4148 4149 mm = nxt_unit_mmap_at(mmaps, id); 4150 if (nxt_slow_path(mm == NULL)) { 4151 nxt_unit_alert(ctx, "failed to allocate mmap"); 4152 4153 pthread_mutex_unlock(&mmaps->mutex); 4154 4155 *hdr = NULL; 4156 4157 return NXT_UNIT_ERROR; 4158 } 4159 4160 *hdr = mm->hdr; 4161 4162 if (nxt_fast_path(*hdr != NULL)) { 4163 return NXT_UNIT_OK; 4164 } 4165 4166 need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf); 4167 4168 nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link); 4169 4170 pthread_mutex_unlock(&mmaps->mutex); 4171 4172 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4173 4174 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 4175 4176 if (need_rbuf) { 4177 res = nxt_unit_get_mmap(ctx, pid, id); 4178 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 4179 return NXT_UNIT_ERROR; 4180 } 4181 } 4182 4183 return NXT_UNIT_AGAIN; 4184 } 4185 4186 4187 static int 4188 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 4189 nxt_unit_read_buf_t *rbuf) 4190 { 4191 int res; 4192 void *start; 4193 uint32_t size; 4194 nxt_unit_impl_t *lib; 4195 nxt_unit_mmaps_t *mmaps; 4196 nxt_unit_mmap_buf_t *b, **incoming_tail; 4197 nxt_port_mmap_msg_t *mmap_msg, *end; 4198 nxt_port_mmap_header_t *hdr; 4199 4200 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 4201 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 4202 recv_msg->stream, (int) recv_msg->size); 4203 4204 return NXT_UNIT_ERROR; 4205 } 4206 4207 mmap_msg = recv_msg->start; 4208 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 4209 4210 incoming_tail = &recv_msg->incoming_buf; 4211 4212 /* Allocating buffer structures. */ 4213 for (; mmap_msg < end; mmap_msg++) { 4214 b = nxt_unit_mmap_buf_get(ctx); 4215 if (nxt_slow_path(b == NULL)) { 4216 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 4217 recv_msg->stream); 4218 4219 while (recv_msg->incoming_buf != NULL) { 4220 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 4221 } 4222 4223 return NXT_UNIT_ERROR; 4224 } 4225 4226 nxt_unit_mmap_buf_insert(incoming_tail, b); 4227 incoming_tail = &b->next; 4228 } 4229 4230 b = recv_msg->incoming_buf; 4231 mmap_msg = recv_msg->start; 4232 4233 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4234 4235 mmaps = &lib->incoming; 4236 4237 pthread_mutex_lock(&mmaps->mutex); 4238 4239 for (; mmap_msg < end; mmap_msg++) { 4240 res = nxt_unit_check_rbuf_mmap(ctx, mmaps, 4241 recv_msg->pid, mmap_msg->mmap_id, 4242 &hdr, rbuf); 4243 4244 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4245 while (recv_msg->incoming_buf != NULL) { 4246 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 4247 } 4248 4249 return res; 4250 } 4251 4252 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 4253 size = mmap_msg->size; 4254 4255 if (recv_msg->start == mmap_msg) { 4256 recv_msg->start = start; 4257 recv_msg->size = size; 4258 } 4259 4260 b->buf.start = start; 4261 b->buf.free = start; 4262 b->buf.end = b->buf.start + size; 4263 b->hdr = hdr; 4264 4265 b = b->next; 4266 4267 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 4268 recv_msg->stream, 4269 start, (int) size, 4270 (int) hdr->src_pid, (int) hdr->dst_pid, 4271 (int) hdr->id, (int) mmap_msg->chunk_id, 4272 (int) mmap_msg->size); 4273 } 4274 4275 pthread_mutex_unlock(&mmaps->mutex); 4276 4277 return NXT_UNIT_OK; 4278 } 4279 4280 4281 static int 4282 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) 4283 { 4284 ssize_t res; 4285 nxt_unit_impl_t *lib; 4286 nxt_unit_ctx_impl_t *ctx_impl; 4287 4288 struct { 4289 nxt_port_msg_t msg; 4290 nxt_port_msg_get_mmap_t get_mmap; 4291 } m; 4292 4293 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4294 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4295 4296 memset(&m.msg, 0, sizeof(nxt_port_msg_t)); 4297 4298 m.msg.pid = lib->pid; 4299 m.msg.reply_port = ctx_impl->read_port->id.id; 4300 m.msg.type = _NXT_PORT_MSG_GET_MMAP; 4301 4302 m.get_mmap.id = id; 4303 4304 nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); 4305 4306 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); 4307 if (nxt_slow_path(res != sizeof(m))) { 4308 return NXT_UNIT_ERROR; 4309 } 4310 4311 return NXT_UNIT_OK; 4312 } 4313 4314 4315 static void 4316 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, 4317 void *start, uint32_t size) 4318 { 4319 int freed_chunks; 4320 u_char *p, *end; 4321 nxt_chunk_id_t c; 4322 nxt_unit_impl_t *lib; 4323 4324 memset(start, 0xA5, size); 4325 4326 p = start; 4327 end = p + size; 4328 c = nxt_port_mmap_chunk_id(hdr, p); 4329 freed_chunks = 0; 4330 4331 while (p < end) { 4332 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 4333 4334 p += PORT_MMAP_CHUNK_SIZE; 4335 c++; 4336 freed_chunks++; 4337 } 4338 4339 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4340 4341 if (hdr->src_pid == lib->pid && freed_chunks != 0) { 4342 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks); 4343 4344 nxt_unit_debug(ctx, "allocated_chunks %d", 4345 (int) lib->outgoing.allocated_chunks); 4346 } 4347 4348 if (hdr->dst_pid == lib->pid 4349 && freed_chunks != 0 4350 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) 4351 { 4352 nxt_unit_send_shm_ack(ctx, hdr->src_pid); 4353 } 4354 } 4355 4356 4357 static int 4358 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) 4359 { 4360 ssize_t res; 4361 nxt_port_msg_t msg; 4362 nxt_unit_impl_t *lib; 4363 4364 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4365 4366 msg.stream = 0; 4367 msg.pid = lib->pid; 4368 msg.reply_port = 0; 4369 msg.type = _NXT_PORT_MSG_SHM_ACK; 4370 msg.last = 0; 4371 msg.mmap = 0; 4372 msg.nf = 0; 4373 msg.mf = 0; 4374 msg.tracking = 0; 4375 4376 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 4377 if (nxt_slow_path(res != sizeof(msg))) { 4378 return NXT_UNIT_ERROR; 4379 } 4380 4381 return NXT_UNIT_OK; 4382 } 4383 4384 4385 static nxt_int_t 4386 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 4387 { 4388 nxt_process_t *process; 4389 4390 process = data; 4391 4392 if (lhq->key.length == sizeof(pid_t) 4393 && *(pid_t *) lhq->key.start == process->pid) 4394 { 4395 return NXT_OK; 4396 } 4397 4398 return NXT_DECLINED; 4399 } 4400 4401 4402 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 4403 NXT_LVLHSH_DEFAULT, 4404 nxt_unit_lvlhsh_pid_test, 4405 nxt_unit_lvlhsh_alloc, 4406 nxt_unit_lvlhsh_free, 4407 }; 4408 4409 4410 static inline void 4411 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) 4412 { 4413 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 4414 lhq->key.length = sizeof(*pid); 4415 lhq->key.start = (u_char *) pid; 4416 lhq->proto = &lvlhsh_processes_proto; 4417 } 4418 4419 4420 static nxt_unit_process_t * 4421 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) 4422 { 4423 nxt_unit_impl_t *lib; 4424 nxt_unit_process_t *process; 4425 nxt_lvlhsh_query_t lhq; 4426 4427 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4428 4429 nxt_unit_process_lhq_pid(&lhq, &pid); 4430 4431 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { 4432 process = lhq.value; 4433 nxt_unit_process_use(process); 4434 4435 return process; 4436 } 4437 4438 process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t)); 4439 if (nxt_slow_path(process == NULL)) { 4440 nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid); 4441 4442 return NULL; 4443 } 4444 4445 process->pid = pid; 4446 process->use_count = 2; 4447 process->next_port_id = 0; 4448 process->lib = lib; 4449 4450 nxt_queue_init(&process->ports); 4451 4452 lhq.replace = 0; 4453 lhq.value = process; 4454 4455 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { 4456 4457 case NXT_OK: 4458 break; 4459 4460 default: 4461 nxt_unit_alert(ctx, "process %d insert failed", (int) pid); 4462 4463 nxt_unit_free(ctx, process); 4464 process = NULL; 4465 break; 4466 } 4467 4468 return process; 4469 } 4470 4471 4472 static nxt_unit_process_t * 4473 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) 4474 { 4475 int rc; 4476 nxt_lvlhsh_query_t lhq; 4477 4478 nxt_unit_process_lhq_pid(&lhq, &pid); 4479 4480 if (remove) { 4481 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); 4482 4483 } else { 4484 rc = nxt_lvlhsh_find(&lib->processes, &lhq); 4485 } 4486 4487 if (rc == NXT_OK) { 4488 if (!remove) { 4489 nxt_unit_process_use(lhq.value); 4490 } 4491 4492 return lhq.value; 4493 } 4494 4495 return NULL; 4496 } 4497 4498 4499 static nxt_unit_process_t * 4500 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) 4501 { 4502 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); 4503 } 4504 4505 4506 int 4507 nxt_unit_run(nxt_unit_ctx_t *ctx) 4508 { 4509 int rc; 4510 nxt_unit_ctx_impl_t *ctx_impl; 4511 4512 nxt_unit_ctx_use(ctx); 4513 4514 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4515 4516 rc = NXT_UNIT_OK; 4517 4518 while (nxt_fast_path(ctx_impl->online)) { 4519 rc = nxt_unit_run_once_impl(ctx); 4520 4521 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4522 nxt_unit_quit(ctx); 4523 break; 4524 } 4525 } 4526 4527 nxt_unit_ctx_release(ctx); 4528 4529 return rc; 4530 } 4531 4532 4533 int 4534 nxt_unit_run_once(nxt_unit_ctx_t *ctx) 4535 { 4536 int rc; 4537 4538 nxt_unit_ctx_use(ctx); 4539 4540 rc = nxt_unit_run_once_impl(ctx); 4541 4542 nxt_unit_ctx_release(ctx); 4543 4544 return rc; 4545 } 4546 4547 4548 static int 4549 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) 4550 { 4551 int rc; 4552 nxt_unit_read_buf_t *rbuf; 4553 4554 rbuf = nxt_unit_read_buf_get(ctx); 4555 if (nxt_slow_path(rbuf == NULL)) { 4556 return NXT_UNIT_ERROR; 4557 } 4558 4559 rc = nxt_unit_read_buf(ctx, rbuf); 4560 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4561 nxt_unit_read_buf_release(ctx, rbuf); 4562 4563 return rc; 4564 } 4565 4566 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 4567 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4568 return NXT_UNIT_ERROR; 4569 } 4570 4571 rc = nxt_unit_process_pending_rbuf(ctx); 4572 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4573 return NXT_UNIT_ERROR; 4574 } 4575 4576 nxt_unit_process_ready_req(ctx); 4577 4578 return rc; 4579 } 4580 4581 4582 static int 4583 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) 4584 { 4585 int nevents, res, err; 4586 nxt_unit_impl_t *lib; 4587 nxt_unit_ctx_impl_t *ctx_impl; 4588 nxt_unit_port_impl_t *port_impl; 4589 struct pollfd fds[2]; 4590 4591 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4592 4593 if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) { 4594 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4595 } 4596 4597 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, 4598 port); 4599 4600 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4601 4602 retry: 4603 4604 if (port_impl->from_socket == 0) { 4605 res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf); 4606 if (res == NXT_UNIT_OK) { 4607 if (nxt_unit_is_read_socket(rbuf)) { 4608 port_impl->from_socket++; 4609 4610 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", 4611 (int) ctx_impl->read_port->id.pid, 4612 (int) ctx_impl->read_port->id.id, 4613 port_impl->from_socket); 4614 4615 } else { 4616 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", 4617 (int) ctx_impl->read_port->id.pid, 4618 (int) ctx_impl->read_port->id.id, 4619 (int) rbuf->size); 4620 4621 return NXT_UNIT_OK; 4622 } 4623 } 4624 } 4625 4626 res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); 4627 if (res == NXT_UNIT_OK) { 4628 return NXT_UNIT_OK; 4629 } 4630 4631 fds[0].fd = ctx_impl->read_port->in_fd; 4632 fds[0].events = POLLIN; 4633 fds[0].revents = 0; 4634 4635 fds[1].fd = lib->shared_port->in_fd; 4636 fds[1].events = POLLIN; 4637 fds[1].revents = 0; 4638 4639 nevents = poll(fds, 2, -1); 4640 if (nxt_slow_path(nevents == -1)) { 4641 err = errno; 4642 4643 if (err == EINTR) { 4644 goto retry; 4645 } 4646 4647 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)", 4648 fds[0].fd, fds[1].fd, strerror(err), err); 4649 4650 rbuf->size = -1; 4651 4652 return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; 4653 } 4654 4655 nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", 4656 fds[0].fd, fds[1].fd, nevents, fds[0].revents, 4657 fds[1].revents); 4658 4659 if ((fds[0].revents & POLLIN) != 0) { 4660 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4661 if (res == NXT_UNIT_AGAIN) { 4662 goto retry; 4663 } 4664 4665 return res; 4666 } 4667 4668 if ((fds[1].revents & POLLIN) != 0) { 4669 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); 4670 if (res == NXT_UNIT_AGAIN) { 4671 goto retry; 4672 } 4673 4674 return res; 4675 } 4676 4677 nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]", 4678 fds[0].fd, fds[1].fd, nevents, fds[0].revents, 4679 fds[1].revents); 4680 4681 return NXT_UNIT_ERROR; 4682 } 4683 4684 4685 static int 4686 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) 4687 { 4688 int rc; 4689 nxt_queue_t pending_rbuf; 4690 nxt_unit_ctx_impl_t *ctx_impl; 4691 nxt_unit_read_buf_t *rbuf; 4692 4693 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4694 4695 pthread_mutex_lock(&ctx_impl->mutex); 4696 4697 if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { 4698 pthread_mutex_unlock(&ctx_impl->mutex); 4699 4700 return NXT_UNIT_OK; 4701 } 4702 4703 nxt_queue_init(&pending_rbuf); 4704 4705 nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf); 4706 nxt_queue_init(&ctx_impl->pending_rbuf); 4707 4708 pthread_mutex_unlock(&ctx_impl->mutex); 4709 4710 rc = NXT_UNIT_OK; 4711 4712 nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { 4713 4714 if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { 4715 rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL); 4716 4717 } else { 4718 nxt_unit_read_buf_release(ctx, rbuf); 4719 } 4720 4721 } nxt_queue_loop; 4722 4723 return rc; 4724 } 4725 4726 4727 static void 4728 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) 4729 { 4730 int res; 4731 nxt_queue_t ready_req; 4732 nxt_unit_impl_t *lib; 4733 nxt_unit_ctx_impl_t *ctx_impl; 4734 nxt_unit_request_info_t *req; 4735 nxt_unit_request_info_impl_t *req_impl; 4736 4737 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4738 4739 pthread_mutex_lock(&ctx_impl->mutex); 4740 4741 if (nxt_queue_is_empty(&ctx_impl->ready_req)) { 4742 pthread_mutex_unlock(&ctx_impl->mutex); 4743 4744 return; 4745 } 4746 4747 nxt_queue_init(&ready_req); 4748 4749 nxt_queue_add(&ready_req, &ctx_impl->ready_req); 4750 nxt_queue_init(&ctx_impl->ready_req); 4751 4752 pthread_mutex_unlock(&ctx_impl->mutex); 4753 4754 nxt_queue_each(req_impl, &ready_req, 4755 nxt_unit_request_info_impl_t, port_wait_link) 4756 { 4757 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 4758 4759 req = &req_impl->req; 4760 4761 res = nxt_unit_send_req_headers_ack(req); 4762 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4763 nxt_unit_request_done(req, NXT_UNIT_ERROR); 4764 4765 continue; 4766 } 4767 4768 if (req->content_length 4769 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 4770 { 4771 res = nxt_unit_request_hash_add(ctx, req); 4772 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4773 nxt_unit_req_warn(req, "failed to add request to hash"); 4774 4775 nxt_unit_request_done(req, NXT_UNIT_ERROR); 4776 4777 continue; 4778 } 4779 4780 /* 4781 * If application have separate data handler, we may start 4782 * request processing and process data when it is arrived. 4783 */ 4784 if (lib->callbacks.data_handler == NULL) { 4785 continue; 4786 } 4787 } 4788 4789 lib->callbacks.request_handler(&req_impl->req); 4790 4791 } nxt_queue_loop; 4792 } 4793 4794 4795 int 4796 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) 4797 { 4798 int rc; 4799 nxt_unit_read_buf_t *rbuf; 4800 nxt_unit_ctx_impl_t *ctx_impl; 4801 4802 nxt_unit_ctx_use(ctx); 4803 4804 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4805 4806 rc = NXT_UNIT_OK; 4807 4808 while (nxt_fast_path(ctx_impl->online)) { 4809 rbuf = nxt_unit_read_buf_get(ctx); 4810 if (nxt_slow_path(rbuf == NULL)) { 4811 rc = NXT_UNIT_ERROR; 4812 break; 4813 } 4814 4815 retry: 4816 4817 rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4818 if (rc == NXT_UNIT_AGAIN) { 4819 goto retry; 4820 } 4821 4822 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 4823 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4824 break; 4825 } 4826 4827 rc = nxt_unit_process_pending_rbuf(ctx); 4828 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4829 break; 4830 } 4831 4832 nxt_unit_process_ready_req(ctx); 4833 } 4834 4835 nxt_unit_ctx_release(ctx); 4836 4837 return rc; 4838 } 4839 4840 4841 nxt_inline int 4842 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf) 4843 { 4844 nxt_port_msg_t *port_msg; 4845 4846 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4847 port_msg = (nxt_port_msg_t *) rbuf->buf; 4848 4849 return port_msg->type == _NXT_PORT_MSG_READ_QUEUE; 4850 } 4851 4852 return 0; 4853 } 4854 4855 4856 nxt_inline int 4857 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf) 4858 { 4859 if (nxt_fast_path(rbuf->size == 1)) { 4860 return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET; 4861 } 4862 4863 return 0; 4864 } 4865 4866 4867 nxt_inline int 4868 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf) 4869 { 4870 nxt_port_msg_t *port_msg; 4871 4872 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4873 port_msg = (nxt_port_msg_t *) rbuf->buf; 4874 4875 return port_msg->type == _NXT_PORT_MSG_SHM_ACK; 4876 } 4877 4878 return 0; 4879 } 4880 4881 4882 nxt_inline int 4883 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf) 4884 { 4885 nxt_port_msg_t *port_msg; 4886 4887 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4888 port_msg = (nxt_port_msg_t *) rbuf->buf; 4889 4890 return port_msg->type == _NXT_PORT_MSG_QUIT; 4891 } 4892 4893 return 0; 4894 } 4895 4896 4897 int 4898 nxt_unit_run_shared(nxt_unit_ctx_t *ctx) 4899 { 4900 int rc; 4901 nxt_unit_impl_t *lib; 4902 nxt_unit_read_buf_t *rbuf; 4903 nxt_unit_ctx_impl_t *ctx_impl; 4904 4905 nxt_unit_ctx_use(ctx); 4906 4907 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4908 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4909 4910 rc = NXT_UNIT_OK; 4911 4912 while (nxt_fast_path(ctx_impl->online)) { 4913 rbuf = nxt_unit_read_buf_get(ctx); 4914 if (nxt_slow_path(rbuf == NULL)) { 4915 rc = NXT_UNIT_ERROR; 4916 break; 4917 } 4918 4919 retry: 4920 4921 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); 4922 if (rc == NXT_UNIT_AGAIN) { 4923 goto retry; 4924 } 4925 4926 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4927 nxt_unit_read_buf_release(ctx, rbuf); 4928 break; 4929 } 4930 4931 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 4932 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4933 break; 4934 } 4935 } 4936 4937 nxt_unit_ctx_release(ctx); 4938 4939 return rc; 4940 } 4941 4942 4943 nxt_unit_request_info_t * 4944 nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) 4945 { 4946 int rc; 4947 nxt_unit_impl_t *lib; 4948 nxt_unit_read_buf_t *rbuf; 4949 nxt_unit_ctx_impl_t *ctx_impl; 4950 nxt_unit_request_info_t *req; 4951 4952 nxt_unit_ctx_use(ctx); 4953 4954 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4955 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4956 4957 req = NULL; 4958 4959 if (nxt_slow_path(!ctx_impl->online)) { 4960 goto done; 4961 } 4962 4963 rbuf = nxt_unit_read_buf_get(ctx); 4964 if (nxt_slow_path(rbuf == NULL)) { 4965 goto done; 4966 } 4967 4968 rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf); 4969 if (rc != NXT_UNIT_OK) { 4970 nxt_unit_read_buf_release(ctx, rbuf); 4971 goto done; 4972 } 4973 4974 (void) nxt_unit_process_msg(ctx, rbuf, &req); 4975 4976 done: 4977 4978 nxt_unit_ctx_release(ctx); 4979 4980 return req; 4981 } 4982 4983 4984 int 4985 nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) 4986 { 4987 nxt_unit_impl_t *lib; 4988 4989 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4990 4991 return (ctx == &lib->main_ctx.ctx); 4992 } 4993 4994 4995 int 4996 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 4997 { 4998 int rc; 4999 5000 nxt_unit_ctx_use(ctx); 5001 5002 rc = nxt_unit_process_port_msg_impl(ctx, port); 5003 5004 nxt_unit_ctx_release(ctx); 5005 5006 return rc; 5007 } 5008 5009 5010 static int 5011 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 5012 { 5013 int rc; 5014 nxt_unit_impl_t *lib; 5015 nxt_unit_read_buf_t *rbuf; 5016 nxt_unit_ctx_impl_t *ctx_impl; 5017 5018 rbuf = nxt_unit_read_buf_get(ctx); 5019 if (nxt_slow_path(rbuf == NULL)) { 5020 return NXT_UNIT_ERROR; 5021 } 5022 5023 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5024 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5025 5026 retry: 5027 5028 if (port == lib->shared_port) { 5029 rc = nxt_unit_shared_port_recv(ctx, port, rbuf); 5030 5031 } else { 5032 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf); 5033 } 5034 5035 if (rc != NXT_UNIT_OK) { 5036 nxt_unit_read_buf_release(ctx, rbuf); 5037 return rc; 5038 } 5039 5040 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 5041 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 5042 return NXT_UNIT_ERROR; 5043 } 5044 5045 rc = nxt_unit_process_pending_rbuf(ctx); 5046 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 5047 return NXT_UNIT_ERROR; 5048 } 5049 5050 nxt_unit_process_ready_req(ctx); 5051 5052 if (ctx_impl->online) { 5053 rbuf = nxt_unit_read_buf_get(ctx); 5054 if (nxt_slow_path(rbuf == NULL)) { 5055 return NXT_UNIT_ERROR; 5056 } 5057 5058 goto retry; 5059 } 5060 5061 return rc; 5062 } 5063 5064 5065 void 5066 nxt_unit_done(nxt_unit_ctx_t *ctx) 5067 { 5068 nxt_unit_ctx_release(ctx); 5069 } 5070 5071 5072 nxt_unit_ctx_t * 5073 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) 5074 { 5075 int rc, queue_fd; 5076 void *mem; 5077 nxt_unit_impl_t *lib; 5078 nxt_unit_port_t *port; 5079 nxt_unit_ctx_impl_t *new_ctx; 5080 nxt_unit_port_impl_t *port_impl; 5081 5082 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5083 5084 new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t) 5085 + lib->request_data_size); 5086 if (nxt_slow_path(new_ctx == NULL)) { 5087 nxt_unit_alert(ctx, "failed to allocate context"); 5088 5089 return NULL; 5090 } 5091 5092 rc = nxt_unit_ctx_init(lib, new_ctx, data); 5093 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5094 nxt_unit_free(ctx, new_ctx); 5095 5096 return NULL; 5097 } 5098 5099 queue_fd = -1; 5100 5101 port = nxt_unit_create_port(&new_ctx->ctx); 5102 if (nxt_slow_path(port == NULL)) { 5103 goto fail; 5104 } 5105 5106 new_ctx->read_port = port; 5107 5108 queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t)); 5109 if (nxt_slow_path(queue_fd == -1)) { 5110 goto fail; 5111 } 5112 5113 mem = mmap(NULL, sizeof(nxt_port_queue_t), 5114 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 5115 if (nxt_slow_path(mem == MAP_FAILED)) { 5116 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 5117 strerror(errno), errno); 5118 5119 goto fail; 5120 } 5121 5122 nxt_port_queue_init(mem); 5123 5124 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5125 port_impl->queue = mem; 5126 5127 rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd); 5128 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5129 goto fail; 5130 } 5131 5132 nxt_unit_close(queue_fd); 5133 5134 return &new_ctx->ctx; 5135 5136 fail: 5137 5138 if (queue_fd != -1) { 5139 nxt_unit_close(queue_fd); 5140 } 5141 5142 nxt_unit_ctx_release(&new_ctx->ctx); 5143 5144 return NULL; 5145 } 5146 5147 5148 static void 5149 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) 5150 { 5151 nxt_unit_impl_t *lib; 5152 nxt_unit_mmap_buf_t *mmap_buf; 5153 nxt_unit_read_buf_t *rbuf; 5154 nxt_unit_request_info_impl_t *req_impl; 5155 nxt_unit_websocket_frame_impl_t *ws_impl; 5156 5157 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 5158 5159 nxt_queue_each(req_impl, &ctx_impl->active_req, 5160 nxt_unit_request_info_impl_t, link) 5161 { 5162 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 5163 5164 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 5165 5166 } nxt_queue_loop; 5167 5168 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]); 5169 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]); 5170 5171 while (ctx_impl->free_buf != NULL) { 5172 mmap_buf = ctx_impl->free_buf; 5173 nxt_unit_mmap_buf_unlink(mmap_buf); 5174 nxt_unit_free(&ctx_impl->ctx, mmap_buf); 5175 } 5176 5177 nxt_queue_each(req_impl, &ctx_impl->free_req, 5178 nxt_unit_request_info_impl_t, link) 5179 { 5180 nxt_unit_request_info_free(req_impl); 5181 5182 } nxt_queue_loop; 5183 5184 nxt_queue_each(ws_impl, &ctx_impl->free_ws, 5185 nxt_unit_websocket_frame_impl_t, link) 5186 { 5187 nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl); 5188 5189 } nxt_queue_loop; 5190 5191 nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link) 5192 { 5193 if (rbuf != &ctx_impl->ctx_read_buf) { 5194 nxt_unit_free(&ctx_impl->ctx, rbuf); 5195 } 5196 } nxt_queue_loop; 5197 5198 pthread_mutex_destroy(&ctx_impl->mutex); 5199 5200 pthread_mutex_lock(&lib->mutex); 5201 5202 nxt_queue_remove(&ctx_impl->link); 5203 5204 pthread_mutex_unlock(&lib->mutex); 5205 5206 if (nxt_fast_path(ctx_impl->read_port != NULL)) { 5207 nxt_unit_remove_port(lib, &ctx_impl->read_port->id); 5208 nxt_unit_port_release(ctx_impl->read_port); 5209 } 5210 5211 if (ctx_impl != &lib->main_ctx) { 5212 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl); 5213 } 5214 5215 nxt_unit_lib_release(lib); 5216 } 5217 5218 5219 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */ 5220 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET) 5221 #define NXT_UNIX_SOCKET SOCK_SEQPACKET 5222 #else 5223 #define NXT_UNIX_SOCKET SOCK_DGRAM 5224 #endif 5225 5226 5227 void 5228 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) 5229 { 5230 nxt_unit_port_hash_id_t port_hash_id; 5231 5232 port_hash_id.pid = pid; 5233 port_hash_id.id = id; 5234 5235 port_id->pid = pid; 5236 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id)); 5237 port_id->id = id; 5238 } 5239 5240 5241 static nxt_unit_port_t * 5242 nxt_unit_create_port(nxt_unit_ctx_t *ctx) 5243 { 5244 int rc, port_sockets[2]; 5245 nxt_unit_impl_t *lib; 5246 nxt_unit_port_t new_port, *port; 5247 nxt_unit_process_t *process; 5248 5249 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5250 5251 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets); 5252 if (nxt_slow_path(rc != 0)) { 5253 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", 5254 strerror(errno), errno); 5255 5256 return NULL; 5257 } 5258 5259 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", 5260 port_sockets[0], port_sockets[1]); 5261 5262 pthread_mutex_lock(&lib->mutex); 5263 5264 process = nxt_unit_process_get(ctx, lib->pid); 5265 if (nxt_slow_path(process == NULL)) { 5266 pthread_mutex_unlock(&lib->mutex); 5267 5268 nxt_unit_close(port_sockets[0]); 5269 nxt_unit_close(port_sockets[1]); 5270 5271 return NULL; 5272 } 5273 5274 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); 5275 5276 new_port.in_fd = port_sockets[0]; 5277 new_port.out_fd = port_sockets[1]; 5278 new_port.data = NULL; 5279 5280 pthread_mutex_unlock(&lib->mutex); 5281 5282 nxt_unit_process_release(process); 5283 5284 port = nxt_unit_add_port(ctx, &new_port, NULL); 5285 if (nxt_slow_path(port == NULL)) { 5286 nxt_unit_close(port_sockets[0]); 5287 nxt_unit_close(port_sockets[1]); 5288 } 5289 5290 return port; 5291 } 5292 5293 5294 static int 5295 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 5296 nxt_unit_port_t *port, int queue_fd) 5297 { 5298 ssize_t res; 5299 nxt_unit_impl_t *lib; 5300 int fds[2] = { port->out_fd, queue_fd }; 5301 5302 struct { 5303 nxt_port_msg_t msg; 5304 nxt_port_msg_new_port_t new_port; 5305 } m; 5306 5307 union { 5308 struct cmsghdr cm; 5309 char space[CMSG_SPACE(sizeof(int) * 2)]; 5310 } cmsg; 5311 5312 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5313 5314 m.msg.stream = 0; 5315 m.msg.pid = lib->pid; 5316 m.msg.reply_port = 0; 5317 m.msg.type = _NXT_PORT_MSG_NEW_PORT; 5318 m.msg.last = 0; 5319 m.msg.mmap = 0; 5320 m.msg.nf = 0; 5321 m.msg.mf = 0; 5322 m.msg.tracking = 0; 5323 5324 m.new_port.id = port->id.id; 5325 m.new_port.pid = port->id.pid; 5326 m.new_port.type = NXT_PROCESS_APP; 5327 m.new_port.max_size = 16 * 1024; 5328 m.new_port.max_share = 64 * 1024; 5329 5330 memset(&cmsg, 0, sizeof(cmsg)); 5331 5332 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); 5333 cmsg.cm.cmsg_level = SOL_SOCKET; 5334 cmsg.cm.cmsg_type = SCM_RIGHTS; 5335 5336 /* 5337 * memcpy() is used instead of simple 5338 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 5339 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 5340 * dereferencing type-punned pointer will break strict-aliasing rules 5341 * 5342 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 5343 * in the same simple assignment as in the code above. 5344 */ 5345 memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); 5346 5347 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); 5348 5349 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; 5350 } 5351 5352 5353 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port) 5354 { 5355 nxt_unit_port_impl_t *port_impl; 5356 5357 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5358 5359 nxt_atomic_fetch_add(&port_impl->use_count, 1); 5360 } 5361 5362 5363 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) 5364 { 5365 long c; 5366 nxt_unit_port_impl_t *port_impl; 5367 5368 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5369 5370 c = nxt_atomic_fetch_add(&port_impl->use_count, -1); 5371 5372 if (c == 1) { 5373 nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d", 5374 (int) port->id.pid, (int) port->id.id, 5375 port->in_fd, port->out_fd); 5376 5377 nxt_unit_process_release(port_impl->process); 5378 5379 if (port->in_fd != -1) { 5380 nxt_unit_close(port->in_fd); 5381 5382 port->in_fd = -1; 5383 } 5384 5385 if (port->out_fd != -1) { 5386 nxt_unit_close(port->out_fd); 5387 5388 port->out_fd = -1; 5389 } 5390 5391 if (port_impl->queue != NULL) { 5392 munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID) 5393 ? sizeof(nxt_app_queue_t) 5394 : sizeof(nxt_port_queue_t)); 5395 } 5396 5397 nxt_unit_free(NULL, port_impl); 5398 } 5399 } 5400 5401 5402 static nxt_unit_port_t * 5403 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) 5404 { 5405 int rc, ready; 5406 nxt_queue_t awaiting_req; 5407 nxt_unit_impl_t *lib; 5408 nxt_unit_port_t *old_port; 5409 nxt_unit_process_t *process; 5410 nxt_unit_port_impl_t *new_port, *old_port_impl; 5411 5412 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5413 5414 pthread_mutex_lock(&lib->mutex); 5415 5416 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); 5417 5418 if (nxt_slow_path(old_port != NULL)) { 5419 nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} " 5420 "in_fd %d out_fd %d queue %p", 5421 port->id.pid, port->id.id, 5422 port->in_fd, port->out_fd, queue); 5423 5424 if (old_port->data == NULL) { 5425 old_port->data = port->data; 5426 port->data = NULL; 5427 } 5428 5429 if (old_port->in_fd == -1) { 5430 old_port->in_fd = port->in_fd; 5431 port->in_fd = -1; 5432 } 5433 5434 if (port->in_fd != -1) { 5435 nxt_unit_close(port->in_fd); 5436 port->in_fd = -1; 5437 } 5438 5439 if (old_port->out_fd == -1) { 5440 old_port->out_fd = port->out_fd; 5441 port->out_fd = -1; 5442 } 5443 5444 if (port->out_fd != -1) { 5445 nxt_unit_close(port->out_fd); 5446 port->out_fd = -1; 5447 } 5448 5449 *port = *old_port; 5450 5451 nxt_queue_init(&awaiting_req); 5452 5453 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); 5454 5455 if (old_port_impl->queue == NULL) { 5456 old_port_impl->queue = queue; 5457 } 5458 5459 ready = (port->in_fd != -1 || port->out_fd != -1); 5460 5461 /* 5462 * Port can be market as 'ready' only after callbacks.add_port() call. 5463 * Otherwise, request may try to use the port before callback. 5464 */ 5465 if (lib->callbacks.add_port == NULL && ready) { 5466 old_port_impl->ready = ready; 5467 5468 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { 5469 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); 5470 nxt_queue_init(&old_port_impl->awaiting_req); 5471 } 5472 } 5473 5474 pthread_mutex_unlock(&lib->mutex); 5475 5476 if (lib->callbacks.add_port != NULL && ready) { 5477 lib->callbacks.add_port(ctx, old_port); 5478 5479 pthread_mutex_lock(&lib->mutex); 5480 5481 old_port_impl->ready = ready; 5482 5483 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { 5484 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); 5485 nxt_queue_init(&old_port_impl->awaiting_req); 5486 } 5487 5488 pthread_mutex_unlock(&lib->mutex); 5489 } 5490 5491 nxt_unit_process_awaiting_req(ctx, &awaiting_req); 5492 5493 return old_port; 5494 } 5495 5496 new_port = NULL; 5497 ready = 0; 5498 5499 nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", 5500 port->id.pid, port->id.id, 5501 port->in_fd, port->out_fd, queue); 5502 5503 process = nxt_unit_process_get(ctx, port->id.pid); 5504 if (nxt_slow_path(process == NULL)) { 5505 goto unlock; 5506 } 5507 5508 if (port->id.id != NXT_UNIT_SHARED_PORT_ID 5509 && port->id.id >= process->next_port_id) 5510 { 5511 process->next_port_id = port->id.id + 1; 5512 } 5513 5514 new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 5515 if (nxt_slow_path(new_port == NULL)) { 5516 nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", 5517 port->id.pid, port->id.id); 5518 5519 goto unlock; 5520 } 5521 5522 new_port->port = *port; 5523 5524 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); 5525 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5526 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", 5527 port->id.pid, port->id.id); 5528 5529 nxt_unit_free(ctx, new_port); 5530 5531 new_port = NULL; 5532 5533 goto unlock; 5534 } 5535 5536 nxt_queue_insert_tail(&process->ports, &new_port->link); 5537 5538 new_port->use_count = 2; 5539 new_port->process = process; 5540 new_port->queue = queue; 5541 new_port->from_socket = 0; 5542 new_port->socket_rbuf = NULL; 5543 5544 nxt_queue_init(&new_port->awaiting_req); 5545 5546 ready = (port->in_fd != -1 || port->out_fd != -1); 5547 5548 if (lib->callbacks.add_port == NULL) { 5549 new_port->ready = ready; 5550 5551 } else { 5552 new_port->ready = 0; 5553 } 5554 5555 process = NULL; 5556 5557 unlock: 5558 5559 pthread_mutex_unlock(&lib->mutex); 5560 5561 if (nxt_slow_path(process != NULL)) { 5562 nxt_unit_process_release(process); 5563 } 5564 5565 if (lib->callbacks.add_port != NULL && new_port != NULL && ready) { 5566 lib->callbacks.add_port(ctx, &new_port->port); 5567 5568 nxt_queue_init(&awaiting_req); 5569 5570 pthread_mutex_lock(&lib->mutex); 5571 5572 new_port->ready = 1; 5573 5574 if (!nxt_queue_is_empty(&new_port->awaiting_req)) { 5575 nxt_queue_add(&awaiting_req, &new_port->awaiting_req); 5576 nxt_queue_init(&new_port->awaiting_req); 5577 } 5578 5579 pthread_mutex_unlock(&lib->mutex); 5580 5581 nxt_unit_process_awaiting_req(ctx, &awaiting_req); 5582 } 5583 5584 return (new_port == NULL) ? NULL : &new_port->port; 5585 } 5586 5587 5588 static void 5589 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req) 5590 { 5591 nxt_unit_ctx_impl_t *ctx_impl; 5592 nxt_unit_request_info_impl_t *req_impl; 5593 5594 nxt_queue_each(req_impl, awaiting_req, 5595 nxt_unit_request_info_impl_t, port_wait_link) 5596 { 5597 nxt_queue_remove(&req_impl->port_wait_link); 5598 5599 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, 5600 ctx); 5601 5602 pthread_mutex_lock(&ctx_impl->mutex); 5603 5604 nxt_queue_insert_tail(&ctx_impl->ready_req, 5605 &req_impl->port_wait_link); 5606 5607 pthread_mutex_unlock(&ctx_impl->mutex); 5608 5609 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); 5610 5611 nxt_unit_awake_ctx(ctx, ctx_impl); 5612 5613 } nxt_queue_loop; 5614 } 5615 5616 5617 static void 5618 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 5619 { 5620 nxt_unit_port_t *port; 5621 nxt_unit_port_impl_t *port_impl; 5622 5623 pthread_mutex_lock(&lib->mutex); 5624 5625 port = nxt_unit_remove_port_unsafe(lib, port_id); 5626 5627 if (nxt_fast_path(port != NULL)) { 5628 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5629 5630 nxt_queue_remove(&port_impl->link); 5631 } 5632 5633 pthread_mutex_unlock(&lib->mutex); 5634 5635 if (lib->callbacks.remove_port != NULL && port != NULL) { 5636 lib->callbacks.remove_port(&lib->unit, port); 5637 } 5638 5639 if (nxt_fast_path(port != NULL)) { 5640 nxt_unit_port_release(port); 5641 } 5642 } 5643 5644 5645 static nxt_unit_port_t * 5646 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 5647 { 5648 nxt_unit_port_t *port; 5649 5650 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); 5651 if (nxt_slow_path(port == NULL)) { 5652 nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found", 5653 (int) port_id->pid, (int) port_id->id); 5654 5655 return NULL; 5656 } 5657 5658 nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p", 5659 (int) port_id->pid, (int) port_id->id, 5660 port->in_fd, port->out_fd, port->data); 5661 5662 return port; 5663 } 5664 5665 5666 static void 5667 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) 5668 { 5669 nxt_unit_process_t *process; 5670 5671 pthread_mutex_lock(&lib->mutex); 5672 5673 process = nxt_unit_process_find(lib, pid, 1); 5674 if (nxt_slow_path(process == NULL)) { 5675 nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid); 5676 5677 pthread_mutex_unlock(&lib->mutex); 5678 5679 return; 5680 } 5681 5682 nxt_unit_remove_process(lib, process); 5683 5684 if (lib->callbacks.remove_pid != NULL) { 5685 lib->callbacks.remove_pid(&lib->unit, pid); 5686 } 5687 } 5688 5689 5690 static void 5691 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) 5692 { 5693 nxt_queue_t ports; 5694 nxt_unit_port_impl_t *port; 5695 5696 nxt_queue_init(&ports); 5697 5698 nxt_queue_add(&ports, &process->ports); 5699 5700 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 5701 5702 nxt_unit_remove_port_unsafe(lib, &port->port.id); 5703 5704 } nxt_queue_loop; 5705 5706 pthread_mutex_unlock(&lib->mutex); 5707 5708 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 5709 5710 nxt_queue_remove(&port->link); 5711 5712 if (lib->callbacks.remove_port != NULL) { 5713 lib->callbacks.remove_port(&lib->unit, &port->port); 5714 } 5715 5716 nxt_unit_port_release(&port->port); 5717 5718 } nxt_queue_loop; 5719 5720 nxt_unit_process_release(process); 5721 } 5722 5723 5724 static void 5725 nxt_unit_quit(nxt_unit_ctx_t *ctx) 5726 { 5727 nxt_port_msg_t msg; 5728 nxt_unit_impl_t *lib; 5729 nxt_unit_ctx_impl_t *ctx_impl; 5730 nxt_unit_callbacks_t *cb; 5731 nxt_unit_request_info_t *req; 5732 nxt_unit_request_info_impl_t *req_impl; 5733 5734 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5735 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5736 5737 if (!ctx_impl->online) { 5738 return; 5739 } 5740 5741 ctx_impl->online = 0; 5742 5743 cb = &lib->callbacks; 5744 5745 if (cb->quit != NULL) { 5746 cb->quit(ctx); 5747 } 5748 5749 nxt_queue_each(req_impl, &ctx_impl->active_req, 5750 nxt_unit_request_info_impl_t, link) 5751 { 5752 req = &req_impl->req; 5753 5754 nxt_unit_req_warn(req, "active request on ctx quit"); 5755 5756 if (cb->close_handler) { 5757 nxt_unit_req_debug(req, "close_handler"); 5758 5759 cb->close_handler(req); 5760 5761 } else { 5762 nxt_unit_request_done(req, NXT_UNIT_ERROR); 5763 } 5764 5765 } nxt_queue_loop; 5766 5767 if (ctx != &lib->main_ctx.ctx) { 5768 return; 5769 } 5770 5771 memset(&msg, 0, sizeof(nxt_port_msg_t)); 5772 5773 msg.pid = lib->pid; 5774 msg.type = _NXT_PORT_MSG_QUIT; 5775 5776 pthread_mutex_lock(&lib->mutex); 5777 5778 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { 5779 5780 if (ctx == &ctx_impl->ctx 5781 || ctx_impl->read_port == NULL 5782 || ctx_impl->read_port->out_fd == -1) 5783 { 5784 continue; 5785 } 5786 5787 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, 5788 &msg, sizeof(msg), NULL, 0); 5789 5790 } nxt_queue_loop; 5791 5792 pthread_mutex_unlock(&lib->mutex); 5793 } 5794 5795 5796 static int 5797 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 5798 { 5799 ssize_t res; 5800 nxt_unit_impl_t *lib; 5801 nxt_unit_ctx_impl_t *ctx_impl; 5802 5803 struct { 5804 nxt_port_msg_t msg; 5805 nxt_port_msg_get_port_t get_port; 5806 } m; 5807 5808 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5809 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5810 5811 memset(&m.msg, 0, sizeof(nxt_port_msg_t)); 5812 5813 m.msg.pid = lib->pid; 5814 m.msg.reply_port = ctx_impl->read_port->id.id; 5815 m.msg.type = _NXT_PORT_MSG_GET_PORT; 5816 5817 m.get_port.id = port_id->id; 5818 m.get_port.pid = port_id->pid; 5819 5820 nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, 5821 (int) port_id->id); 5822 5823 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); 5824 if (nxt_slow_path(res != sizeof(m))) { 5825 return NXT_UNIT_ERROR; 5826 } 5827 5828 return NXT_UNIT_OK; 5829 } 5830 5831 5832 static ssize_t 5833 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5834 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 5835 { 5836 int notify; 5837 ssize_t ret; 5838 nxt_int_t rc; 5839 nxt_port_msg_t msg; 5840 nxt_unit_impl_t *lib; 5841 nxt_unit_port_impl_t *port_impl; 5842 5843 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5844 5845 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5846 if (port_impl->queue != NULL && oob_size == 0 5847 && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) 5848 { 5849 rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); 5850 if (nxt_slow_path(rc != NXT_OK)) { 5851 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", 5852 (int) port->id.pid, (int) port->id.id); 5853 5854 return -1; 5855 } 5856 5857 nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d", 5858 (int) port->id.pid, (int) port->id.id, 5859 (int) buf_size, notify); 5860 5861 if (notify) { 5862 memcpy(&msg, buf, sizeof(nxt_port_msg_t)); 5863 5864 msg.type = _NXT_PORT_MSG_READ_QUEUE; 5865 5866 if (lib->callbacks.port_send == NULL) { 5867 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, 5868 sizeof(nxt_port_msg_t), NULL, 0); 5869 5870 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", 5871 (int) port->id.pid, (int) port->id.id, 5872 (int) ret); 5873 5874 } else { 5875 ret = lib->callbacks.port_send(ctx, port, &msg, 5876 sizeof(nxt_port_msg_t), NULL, 0); 5877 5878 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue", 5879 (int) port->id.pid, (int) port->id.id, 5880 (int) ret); 5881 } 5882 5883 } 5884 5885 return buf_size; 5886 } 5887 5888 if (port_impl->queue != NULL) { 5889 msg.type = _NXT_PORT_MSG_READ_SOCKET; 5890 5891 rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify); 5892 if (nxt_slow_path(rc != NXT_OK)) { 5893 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", 5894 (int) port->id.pid, (int) port->id.id); 5895 5896 return -1; 5897 } 5898 5899 nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d", 5900 (int) port->id.pid, (int) port->id.id, notify); 5901 } 5902 5903 if (lib->callbacks.port_send != NULL) { 5904 ret = lib->callbacks.port_send(ctx, port, buf, buf_size, 5905 oob, oob_size); 5906 5907 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", 5908 (int) port->id.pid, (int) port->id.id, 5909 (int) ret); 5910 5911 } else { 5912 ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, 5913 oob, oob_size); 5914 5915 nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", 5916 (int) port->id.pid, (int) port->id.id, 5917 (int) ret); 5918 } 5919 5920 return ret; 5921 } 5922 5923 5924 static ssize_t 5925 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 5926 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 5927 { 5928 int err; 5929 ssize_t res; 5930 struct iovec iov[1]; 5931 struct msghdr msg; 5932 5933 iov[0].iov_base = (void *) buf; 5934 iov[0].iov_len = buf_size; 5935 5936 msg.msg_name = NULL; 5937 msg.msg_namelen = 0; 5938 msg.msg_iov = iov; 5939 msg.msg_iovlen = 1; 5940 msg.msg_flags = 0; 5941 msg.msg_control = (void *) oob; 5942 msg.msg_controllen = oob_size; 5943 5944 retry: 5945 5946 res = sendmsg(fd, &msg, 0); 5947 5948 if (nxt_slow_path(res == -1)) { 5949 err = errno; 5950 5951 if (err == EINTR) { 5952 goto retry; 5953 } 5954 5955 /* 5956 * FIXME: This should be "alert" after router graceful shutdown 5957 * implementation. 5958 */ 5959 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", 5960 fd, (int) buf_size, strerror(err), err); 5961 5962 } else { 5963 nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, 5964 (int) res); 5965 } 5966 5967 return res; 5968 } 5969 5970 5971 static int 5972 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5973 nxt_unit_read_buf_t *rbuf) 5974 { 5975 int res, read; 5976 nxt_unit_port_impl_t *port_impl; 5977 5978 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5979 5980 read = 0; 5981 5982 retry: 5983 5984 if (port_impl->from_socket > 0) { 5985 if (port_impl->socket_rbuf != NULL 5986 && port_impl->socket_rbuf->size > 0) 5987 { 5988 port_impl->from_socket--; 5989 5990 nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf); 5991 port_impl->socket_rbuf->size = 0; 5992 5993 nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d", 5994 (int) port->id.pid, (int) port->id.id, 5995 (int) rbuf->size); 5996 5997 return NXT_UNIT_OK; 5998 } 5999 6000 } else { 6001 res = nxt_unit_port_queue_recv(port, rbuf); 6002 6003 if (res == NXT_UNIT_OK) { 6004 if (nxt_unit_is_read_socket(rbuf)) { 6005 port_impl->from_socket++; 6006 6007 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", 6008 (int) port->id.pid, (int) port->id.id, 6009 port_impl->from_socket); 6010 6011 goto retry; 6012 } 6013 6014 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", 6015 (int) port->id.pid, (int) port->id.id, 6016 (int) rbuf->size); 6017 6018 return NXT_UNIT_OK; 6019 } 6020 } 6021 6022 if (read) { 6023 return NXT_UNIT_AGAIN; 6024 } 6025 6026 res = nxt_unit_port_recv(ctx, port, rbuf); 6027 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 6028 return NXT_UNIT_ERROR; 6029 } 6030 6031 read = 1; 6032 6033 if (nxt_unit_is_read_queue(rbuf)) { 6034 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", 6035 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 6036 6037 goto retry; 6038 } 6039 6040 nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d", 6041 (int) port->id.pid, (int) port->id.id, 6042 (int) rbuf->size); 6043 6044 if (res == NXT_UNIT_AGAIN) { 6045 return NXT_UNIT_AGAIN; 6046 } 6047 6048 if (port_impl->from_socket > 0) { 6049 port_impl->from_socket--; 6050 6051 return NXT_UNIT_OK; 6052 } 6053 6054 nxt_unit_debug(ctx, "port{%d,%d} suspend message %d", 6055 (int) port->id.pid, (int) port->id.id, 6056 (int) rbuf->size); 6057 6058 if (port_impl->socket_rbuf == NULL) { 6059 port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx); 6060 6061 if (nxt_slow_path(port_impl->socket_rbuf == NULL)) { 6062 return NXT_UNIT_ERROR; 6063 } 6064 6065 port_impl->socket_rbuf->size = 0; 6066 } 6067 6068 if (port_impl->socket_rbuf->size > 0) { 6069 nxt_unit_alert(ctx, "too many port socket messages"); 6070 6071 return NXT_UNIT_ERROR; 6072 } 6073 6074 nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); 6075 6076 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 6077 6078 goto retry; 6079 } 6080 6081 6082 nxt_inline void 6083 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) 6084 { 6085 memcpy(dst->buf, src->buf, src->size); 6086 dst->size = src->size; 6087 memcpy(dst->oob, src->oob, sizeof(src->oob)); 6088 } 6089 6090 6091 static int 6092 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 6093 nxt_unit_read_buf_t *rbuf) 6094 { 6095 int res; 6096 nxt_unit_port_impl_t *port_impl; 6097 6098 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 6099 6100 retry: 6101 6102 res = nxt_unit_app_queue_recv(port, rbuf); 6103 6104 if (res == NXT_UNIT_AGAIN) { 6105 res = nxt_unit_port_recv(ctx, port, rbuf); 6106 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 6107 return NXT_UNIT_ERROR; 6108 } 6109 6110 if (nxt_unit_is_read_queue(rbuf)) { 6111 nxt_app_queue_notification_received(port_impl->queue); 6112 6113 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", 6114 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 6115 6116 goto retry; 6117 } 6118 } 6119 6120 return res; 6121 } 6122 6123 6124 static int 6125 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 6126 nxt_unit_read_buf_t *rbuf) 6127 { 6128 int fd, err; 6129 struct iovec iov[1]; 6130 struct msghdr msg; 6131 nxt_unit_impl_t *lib; 6132 6133 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 6134 6135 if (lib->callbacks.port_recv != NULL) { 6136 rbuf->size = lib->callbacks.port_recv(ctx, port, 6137 rbuf->buf, sizeof(rbuf->buf), 6138 rbuf->oob, sizeof(rbuf->oob)); 6139 6140 nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", 6141 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 6142 6143 if (nxt_slow_path(rbuf->size < 0)) { 6144 return NXT_UNIT_ERROR; 6145 } 6146 6147 return NXT_UNIT_OK; 6148 } 6149 6150 iov[0].iov_base = rbuf->buf; 6151 iov[0].iov_len = sizeof(rbuf->buf); 6152 6153 msg.msg_name = NULL; 6154 msg.msg_namelen = 0; 6155 msg.msg_iov = iov; 6156 msg.msg_iovlen = 1; 6157 msg.msg_flags = 0; 6158 msg.msg_control = rbuf->oob; 6159 msg.msg_controllen = sizeof(rbuf->oob); 6160 6161 fd = port->in_fd; 6162 6163 retry: 6164 6165 rbuf->size = recvmsg(fd, &msg, 0); 6166 6167 if (nxt_slow_path(rbuf->size == -1)) { 6168 err = errno; 6169 6170 if (err == EINTR) { 6171 goto retry; 6172 } 6173 6174 if (err == EAGAIN) { 6175 nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", 6176 fd, strerror(err), err); 6177 6178 return NXT_UNIT_AGAIN; 6179 } 6180 6181 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", 6182 fd, strerror(err), err); 6183 6184 return NXT_UNIT_ERROR; 6185 } 6186 6187 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size); 6188 6189 return NXT_UNIT_OK; 6190 } 6191 6192 6193 static int 6194 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) 6195 { 6196 nxt_unit_port_impl_t *port_impl; 6197 6198 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 6199 6200 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf); 6201 6202 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; 6203 } 6204 6205 6206 static int 6207 nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) 6208 { 6209 uint32_t cookie; 6210 nxt_port_msg_t *port_msg; 6211 nxt_app_queue_t *queue; 6212 nxt_unit_port_impl_t *port_impl; 6213 6214 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 6215 queue = port_impl->queue; 6216 6217 retry: 6218 6219 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie); 6220 6221 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size); 6222 6223 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) { 6224 port_msg = (nxt_port_msg_t *) rbuf->buf; 6225 6226 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { 6227 return NXT_UNIT_OK; 6228 } 6229 6230 nxt_unit_debug(NULL, "app_queue_recv: message cancelled"); 6231 6232 goto retry; 6233 } 6234 6235 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; 6236 } 6237 6238 6239 nxt_inline int 6240 nxt_unit_close(int fd) 6241 { 6242 int res; 6243 6244 res = close(fd); 6245 6246 if (nxt_slow_path(res == -1)) { 6247 nxt_unit_alert(NULL, "close(%d) failed: %s (%d)", 6248 fd, strerror(errno), errno); 6249 6250 } else { 6251 nxt_unit_debug(NULL, "close(%d): %d", fd, res); 6252 } 6253 6254 return res; 6255 } 6256 6257 6258 static int 6259 nxt_unit_fd_blocking(int fd) 6260 { 6261 int nb; 6262 6263 nb = 0; 6264 6265 if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) { 6266 nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 6267 fd, strerror(errno), errno); 6268 6269 return NXT_UNIT_ERROR; 6270 } 6271 6272 return NXT_UNIT_OK; 6273 } 6274 6275 6276 static nxt_int_t 6277 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 6278 { 6279 nxt_unit_port_t *port; 6280 nxt_unit_port_hash_id_t *port_id; 6281 6282 port = data; 6283 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start; 6284 6285 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t) 6286 && port_id->pid == port->id.pid 6287 && port_id->id == port->id.id) 6288 { 6289 return NXT_OK; 6290 } 6291 6292 return NXT_DECLINED; 6293 } 6294 6295 6296 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { 6297 NXT_LVLHSH_DEFAULT, 6298 nxt_unit_port_hash_test, 6299 nxt_unit_lvlhsh_alloc, 6300 nxt_unit_lvlhsh_free, 6301 }; 6302 6303 6304 static inline void 6305 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq, 6306 nxt_unit_port_hash_id_t *port_hash_id, 6307 nxt_unit_port_id_t *port_id) 6308 { 6309 port_hash_id->pid = port_id->pid; 6310 port_hash_id->id = port_id->id; 6311 6312 if (nxt_fast_path(port_id->hash != 0)) { 6313 lhq->key_hash = port_id->hash; 6314 6315 } else { 6316 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id)); 6317 6318 port_id->hash = lhq->key_hash; 6319 6320 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X", 6321 (int) port_id->pid, (int) port_id->id, 6322 (int) port_id->hash); 6323 } 6324 6325 lhq->key.length = sizeof(nxt_unit_port_hash_id_t); 6326 lhq->key.start = (u_char *) port_hash_id; 6327 lhq->proto = &lvlhsh_ports_proto; 6328 lhq->pool = NULL; 6329 } 6330 6331 6332 static int 6333 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) 6334 { 6335 nxt_int_t res; 6336 nxt_lvlhsh_query_t lhq; 6337 nxt_unit_port_hash_id_t port_hash_id; 6338 6339 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id); 6340 lhq.replace = 0; 6341 lhq.value = port; 6342 6343 res = nxt_lvlhsh_insert(port_hash, &lhq); 6344 6345 switch (res) { 6346 6347 case NXT_OK: 6348 return NXT_UNIT_OK; 6349 6350 default: 6351 return NXT_UNIT_ERROR; 6352 } 6353 } 6354 6355 6356 static nxt_unit_port_t * 6357 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, 6358 int remove) 6359 { 6360 nxt_int_t res; 6361 nxt_lvlhsh_query_t lhq; 6362 nxt_unit_port_hash_id_t port_hash_id; 6363 6364 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id); 6365 6366 if (remove) { 6367 res = nxt_lvlhsh_delete(port_hash, &lhq); 6368 6369 } else { 6370 res = nxt_lvlhsh_find(port_hash, &lhq); 6371 } 6372 6373 switch (res) { 6374 6375 case NXT_OK: 6376 if (!remove) { 6377 nxt_unit_port_use(lhq.value); 6378 } 6379 6380 return lhq.value; 6381 6382 default: 6383 return NULL; 6384 } 6385 } 6386 6387 6388 static nxt_int_t 6389 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 6390 { 6391 return NXT_OK; 6392 } 6393 6394 6395 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { 6396 NXT_LVLHSH_DEFAULT, 6397 nxt_unit_request_hash_test, 6398 nxt_unit_lvlhsh_alloc, 6399 nxt_unit_lvlhsh_free, 6400 }; 6401 6402 6403 static int 6404 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 6405 nxt_unit_request_info_t *req) 6406 { 6407 uint32_t *stream; 6408 nxt_int_t res; 6409 nxt_lvlhsh_query_t lhq; 6410 nxt_unit_ctx_impl_t *ctx_impl; 6411 nxt_unit_request_info_impl_t *req_impl; 6412 6413 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 6414 if (req_impl->in_hash) { 6415 return NXT_UNIT_OK; 6416 } 6417 6418 stream = &req_impl->stream; 6419 6420 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 6421 lhq.key.length = sizeof(*stream); 6422 lhq.key.start = (u_char *) stream; 6423 lhq.proto = &lvlhsh_requests_proto; 6424 lhq.pool = NULL; 6425 lhq.replace = 0; 6426 lhq.value = req_impl; 6427 6428 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 6429 6430 pthread_mutex_lock(&ctx_impl->mutex); 6431 6432 res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq); 6433 6434 pthread_mutex_unlock(&ctx_impl->mutex); 6435 6436 switch (res) { 6437 6438 case NXT_OK: 6439 req_impl->in_hash = 1; 6440 return NXT_UNIT_OK; 6441 6442 default: 6443 return NXT_UNIT_ERROR; 6444 } 6445 } 6446 6447 6448 static nxt_unit_request_info_t * 6449 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) 6450 { 6451 nxt_int_t res; 6452 nxt_lvlhsh_query_t lhq; 6453 nxt_unit_ctx_impl_t *ctx_impl; 6454 nxt_unit_request_info_impl_t *req_impl; 6455 6456 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); 6457 lhq.key.length = sizeof(stream); 6458 lhq.key.start = (u_char *) &stream; 6459 lhq.proto = &lvlhsh_requests_proto; 6460 lhq.pool = NULL; 6461 6462 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 6463 6464 pthread_mutex_lock(&ctx_impl->mutex); 6465 6466 if (remove) { 6467 res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq); 6468 6469 } else { 6470 res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq); 6471 } 6472 6473 pthread_mutex_unlock(&ctx_impl->mutex); 6474 6475 switch (res) { 6476 6477 case NXT_OK: 6478 req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t, 6479 req); 6480 if (remove) { 6481 req_impl->in_hash = 0; 6482 } 6483 6484 return lhq.value; 6485 6486 default: 6487 return NULL; 6488 } 6489 } 6490 6491 6492 void 6493 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 6494 { 6495 int log_fd, n; 6496 char msg[NXT_MAX_ERROR_STR], *p, *end; 6497 pid_t pid; 6498 va_list ap; 6499 nxt_unit_impl_t *lib; 6500 6501 if (nxt_fast_path(ctx != NULL)) { 6502 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 6503 6504 pid = lib->pid; 6505 log_fd = lib->log_fd; 6506 6507 } else { 6508 pid = getpid(); 6509 log_fd = STDERR_FILENO; 6510 } 6511 6512 p = msg; 6513 end = p + sizeof(msg) - 1; 6514 6515 p = nxt_unit_snprint_prefix(p, end, pid, level); 6516 6517 va_start(ap, fmt); 6518 p += vsnprintf(p, end - p, fmt, ap); 6519 va_end(ap); 6520 6521 if (nxt_slow_path(p > end)) { 6522 memcpy(end - 5, "[...]", 5); 6523 p = end; 6524 } 6525 6526 *p++ = '\n'; 6527 6528 n = write(log_fd, msg, p - msg); 6529 if (nxt_slow_path(n < 0)) { 6530 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 6531 } 6532 } 6533 6534 6535 void 6536 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) 6537 { 6538 int log_fd, n; 6539 char msg[NXT_MAX_ERROR_STR], *p, *end; 6540 pid_t pid; 6541 va_list ap; 6542 nxt_unit_impl_t *lib; 6543 nxt_unit_request_info_impl_t *req_impl; 6544 6545 if (nxt_fast_path(req != NULL)) { 6546 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 6547 6548 pid = lib->pid; 6549 log_fd = lib->log_fd; 6550 6551 } else { 6552 pid = getpid(); 6553 log_fd = STDERR_FILENO; 6554 } 6555 6556 p = msg; 6557 end = p + sizeof(msg) - 1; 6558 6559 p = nxt_unit_snprint_prefix(p, end, pid, level); 6560 6561 if (nxt_fast_path(req != NULL)) { 6562 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 6563 6564 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); 6565 } 6566 6567 va_start(ap, fmt); 6568 p += vsnprintf(p, end - p, fmt, ap); 6569 va_end(ap); 6570 6571 if (nxt_slow_path(p > end)) { 6572 memcpy(end - 5, "[...]", 5); 6573 p = end; 6574 } 6575 6576 *p++ = '\n'; 6577 6578 n = write(log_fd, msg, p - msg); 6579 if (nxt_slow_path(n < 0)) { 6580 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 6581 } 6582 } 6583 6584 6585 static const char * nxt_unit_log_levels[] = { 6586 "alert", 6587 "error", 6588 "warn", 6589 "notice", 6590 "info", 6591 "debug", 6592 }; 6593 6594 6595 static char * 6596 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) 6597 { 6598 struct tm tm; 6599 struct timespec ts; 6600 6601 (void) clock_gettime(CLOCK_REALTIME, &ts); 6602 6603 #if (NXT_HAVE_LOCALTIME_R) 6604 (void) localtime_r(&ts.tv_sec, &tm); 6605 #else 6606 tm = *localtime(&ts.tv_sec); 6607 #endif 6608 6609 #if (NXT_DEBUG) 6610 p += snprintf(p, end - p, 6611 "%4d/%02d/%02d %02d:%02d:%02d.%03d ", 6612 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 6613 tm.tm_hour, tm.tm_min, tm.tm_sec, 6614 (int) ts.tv_nsec / 1000000); 6615 #else 6616 p += snprintf(p, end - p, 6617 "%4d/%02d/%02d %02d:%02d:%02d ", 6618 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 6619 tm.tm_hour, tm.tm_min, tm.tm_sec); 6620 #endif 6621 6622 p += snprintf(p, end - p, 6623 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], 6624 (int) pid, 6625 (uint64_t) (uintptr_t) nxt_thread_get_tid()); 6626 6627 return p; 6628 } 6629 6630 6631 static void * 6632 nxt_unit_lvlhsh_alloc(void *data, size_t size) 6633 { 6634 int err; 6635 void *p; 6636 6637 err = posix_memalign(&p, size, size); 6638 6639 if (nxt_fast_path(err == 0)) { 6640 nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p", 6641 (int) size, (int) size, p); 6642 return p; 6643 } 6644 6645 nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)", 6646 (int) size, (int) size, strerror(err), err); 6647 return NULL; 6648 } 6649 6650 6651 static void 6652 nxt_unit_lvlhsh_free(void *data, void *p) 6653 { 6654 nxt_unit_free(NULL, p); 6655 } 6656 6657 6658 void * 6659 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size) 6660 { 6661 void *p; 6662 6663 p = malloc(size); 6664 6665 if (nxt_fast_path(p != NULL)) { 6666 #if (NXT_DEBUG_ALLOC) 6667 nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p); 6668 #endif 6669 6670 } else { 6671 nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)", 6672 (int) size, strerror(errno), errno); 6673 } 6674 6675 return p; 6676 } 6677 6678 6679 void 6680 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p) 6681 { 6682 #if (NXT_DEBUG_ALLOC) 6683 nxt_unit_debug(ctx, "free(%p)", p); 6684 #endif 6685 6686 free(p); 6687 } 6688 6689 6690 static int 6691 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length) 6692 { 6693 u_char c1, c2; 6694 nxt_int_t n; 6695 const u_char *s1, *s2; 6696 6697 s1 = p1; 6698 s2 = p2; 6699 6700 while (length-- != 0) { 6701 c1 = *s1++; 6702 c2 = *s2++; 6703 6704 c1 = nxt_lowcase(c1); 6705 c2 = nxt_lowcase(c2); 6706 6707 n = c1 - c2; 6708 6709 if (n != 0) { 6710 return n; 6711 } 6712 } 6713 6714 return 0; 6715 } 6716