| 41nxt_discovery_start(nxt_task_t *task, void *data) 42{ 43 nxt_buf_t *b; 44 nxt_port_t *main_port; 45 nxt_runtime_t *rt; 46 47 nxt_debug(task, "DISCOVERY"); 48 49 b = nxt_discovery_modules(task, "build/nginext.*"); 50 51 rt = task->thread->runtime; 52 main_port = rt->port_by_type[NXT_PROCESS_MASTER]; 53 54 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1, 55 0, -1, b); 56 57 return NXT_OK; 58} 59 60 61static void 62nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data) 63{ 64 nxt_mp_t *mp; 65 nxt_buf_t *b; 66 67 b = obj; 68 mp = b->data; 69 70 nxt_mp_destroy(mp); 71 72 exit(0); 73} 74 75 76static nxt_buf_t * 77nxt_discovery_modules(nxt_task_t *task, const char *path) 78{ 79 char *name; 80 u_char *p, *end; 81 size_t size; 82 glob_t glb; 83 nxt_mp_t *mp; 84 nxt_buf_t *b; 85 nxt_int_t ret; 86 nxt_uint_t i, n; 87 nxt_array_t *modules; 88 nxt_module_t *module; 89 90 b = NULL; 91 92 mp = nxt_mp_create(1024, 128, 256, 32); 93 if (mp == NULL) { 94 return b; 95 } 96 97 ret = glob(path, 0, NULL, &glb); 98 99 if (ret == 0) { 100 n = glb.gl_pathc; 101 102 modules = nxt_array_create(mp, n, sizeof(nxt_module_t)); 103 if (modules == NULL) { 104 goto fail; 105 } 106 107 for (i = 0; i < n; i++) { 108 name = glb.gl_pathv[i]; 109 110 ret = nxt_discovery_module(task, mp, modules, name); 111 if (ret != NXT_OK) { 112 goto fail; 113 } 114 } 115 116 size = sizeof("[]") - 1; 117 module = modules->elts; 118 n = modules->nelts; 119 120 for (i = 0; i < n; i++) { 121 nxt_debug(task, "module: %V %V %V", 122 &module[i].type, &module[i].version, &module[i].file); 123 124 size += sizeof("{\"type\": \"\",") - 1; 125 size += sizeof(" \"version\": \"\",") - 1; 126 size += sizeof(" \"file\": \"\"},") - 1; 127 128 size += module[i].type.length 129 + module[i].version.length 130 + module[i].file.length; 131 } 132 133 b = nxt_buf_mem_alloc(mp, size, 0); 134 if (b == NULL) { 135 goto fail; 136 } 137 138 b->completion_handler = nxt_discovery_completion_handler; 139 140 p = b->mem.free; 141 end = b->mem.end; 142 *p++ = '['; 143 144 for (i = 0; i < n; i++) { 145 p = nxt_sprintf(p, end, 146 "{\"type\": \"%V\", \"version\": \"%V\", \"file\": \"%V\"},", 147 &module[i].type, &module[i].version, &module[i].file); 148 } 149 150 *p++ = ']'; 151 b->mem.free = p; 152 } 153 154fail: 155 156 globfree(&glb); 157 158 return b; 159} 160 161 162static nxt_int_t 163nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules, 164 const char *name) 165{ 166 void *dl; 167 nxt_str_t *s; 168 nxt_int_t ret; 169 nxt_uint_t i, n; 170 nxt_module_t *module; 171 nxt_application_module_t *app; 172 173 /* 174 * Only memory allocation failure should return NXT_ERROR. 175 * Any module processing errors are ignored. 176 */ 177 ret = NXT_ERROR; 178 179 dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW); 180 181 if (dl == NULL) { 182 nxt_log(task, NXT_LOG_CRIT, "dlopen(\"%s\"), failed: \"%s\"", 183 name, dlerror()); 184 return NXT_OK; 185 } 186 187 app = dlsym(dl, "nxt_app_module"); 188 189 if (app != NULL) { 190 nxt_log(task, NXT_LOG_NOTICE, "module: %V \"%s\"", 191 &app->version, name); 192 193 module = modules->elts; 194 n = modules->nelts; 195 196 for (i = 0; i < n; i++) { 197 if (nxt_strstr_eq(&app->version, &module[i].version)) { 198 nxt_log(task, NXT_LOG_NOTICE, 199 "ignoring %s module with the same " 200 "application language version %V as in %s", 201 name, &module[i].version, &module[i].file); 202 203 goto done; 204 } 205 } 206 207 module = nxt_array_add(modules); 208 if (module == NULL) { 209 goto fail; 210 } 211 212 s = nxt_str_dup(mp, &module->type, &app->type); 213 if (s == NULL) { 214 goto fail; 215 } 216 217 s = nxt_str_dup(mp, &module->version, &app->version); 218 if (s == NULL) { 219 goto fail; 220 } 221 222 module->file.length = nxt_strlen(name); 223 224 module->file.start = nxt_mp_alloc(mp, module->file.length); 225 if (module->file.start == NULL) { 226 goto fail; 227 } 228 229 nxt_memcpy(module->file.start, name, module->file.length); 230 231 } else { 232 nxt_log(task, NXT_LOG_CRIT, "dlsym(\"%s\"), failed: \"%s\"", 233 name, dlerror()); 234 } 235 236done: 237 238 ret = NXT_OK; 239 240fail: 241 242 if (dlclose(dl) != 0) { 243 nxt_log(task, NXT_LOG_CRIT, "dlclose(\"%s\"), failed: \"%s\"", 244 name, dlerror()); 245 } 246 247 return ret; 248} 249 250 251nxt_int_t
|
56nxt_int_t 57nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt) 58{ 59 nxt_http_fields_hash_t *hash; 60 61 hash = nxt_http_fields_hash_create(nxt_app_request_fields, rt->mem_pool); 62 if (nxt_slow_path(hash == NULL)) { 63 return NXT_ERROR; 64 } 65 66 nxt_app_request_fields_hash = hash; 67 68 return NXT_OK; 69} 70 71 72void 73nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 74{ 75 size_t dump_size; 76 nxt_buf_t *b; 77 nxt_port_t *port; 78 nxt_app_rmsg_t rmsg = { msg->buf }; 79 nxt_app_wmsg_t wmsg; 80 81 b = msg->buf; 82 dump_size = b->mem.free - b->mem.pos; 83 84 if (dump_size > 300) { 85 dump_size = 300; 86 } 87 88 nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos); 89 90 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 91 msg->port_msg.reply_port); 92 if (nxt_slow_path(port == NULL)) { 93 // 94 } 95 96 wmsg.port = port; 97 wmsg.write = NULL; 98 wmsg.buf = &wmsg.write; 99 wmsg.stream = msg->port_msg.stream; 100 101 nxt_app->run(task, &rmsg, &wmsg); 102} 103 104 105nxt_inline nxt_port_t * 106nxt_app_msg_get_port(nxt_task_t *task, nxt_app_wmsg_t *msg) 107{ 108 return msg->port; 109} 110 111 112u_char * 113nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size) 114{ 115 size_t free_size; 116 u_char *res; 117 nxt_buf_t *b; 118 nxt_port_t *port; 119 120 res = NULL; 121 122 do { 123 b = *msg->buf; 124 125 if (b == NULL) { 126 port = nxt_app_msg_get_port(task, msg); 127 if (nxt_slow_path(port == NULL)) { 128 return NULL; 129 } 130 131 b = nxt_port_mmap_get_buf(task, port, size); 132 if (nxt_slow_path(b == NULL)) { 133 return NULL; 134 } 135 136 *msg->buf = b; 137 138 free_size = nxt_buf_mem_free_size(&b->mem); 139 140 if (nxt_slow_path(free_size < size)) { 141 nxt_debug(task, "requested buffer too big (%z < %z)", 142 free_size, size); 143 return NULL; 144 } 145 146 } 147 148 free_size = nxt_buf_mem_free_size(&b->mem); 149 150 if (free_size >= size) { 151 res = b->mem.free; 152 b->mem.free += size; 153 154 return res; 155 } 156 157 if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) { 158 res = b->mem.free; 159 b->mem.free += size; 160 161 return res; 162 } 163 164 msg->buf = &b->next; 165 } while(1); 166} 167 168 169nxt_int_t 170nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size) 171{ 172 u_char *dst; 173 size_t dst_length; 174 175 if (c != NULL) { 176 dst_length = size + (size < 128 ? 1 : 4) + 1; 177 178 dst = nxt_app_msg_write_get_buf(task, msg, dst_length); 179 if (nxt_slow_path(dst == NULL)) { 180 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed", 181 dst_length); 182 return NXT_ERROR; 183 } 184 185 dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */ 186 187 nxt_memcpy(dst, c, size); 188 dst[size] = 0; 189 190 nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, (int)size, c); 191 } else { 192 dst_length = 1; 193 194 dst = nxt_app_msg_write_get_buf(task, msg, dst_length); 195 if (nxt_slow_path(dst == NULL)) { 196 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed", 197 dst_length); 198 return NXT_ERROR; 199 } 200 201 dst = nxt_app_msg_write_length(dst, 0); 202 203 nxt_debug(task, "nxt_app_msg_write: NULL"); 204 } 205 206 return NXT_OK; 207} 208 209 210nxt_int_t 211nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg, 212 const nxt_str_t *prefix, const nxt_str_t *v) 213{ 214 u_char *dst, *src; 215 size_t i, length, dst_length; 216 217 length = prefix->length + v->length; 218 219 dst_length = length + (length < 128 ? 1 : 4) + 1; 220 221 dst = nxt_app_msg_write_get_buf(task, msg, dst_length); 222 if (nxt_slow_path(dst == NULL)) { 223 return NXT_ERROR; 224 } 225 226 dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */ 227 228 nxt_memcpy(dst, prefix->start, prefix->length); 229 dst += prefix->length; 230 231 src = v->start; 232 for (i = 0; i < v->length; i++, dst++, src++) { 233 234 if (*src >= 'a' && *src <= 'z') { 235 *dst = *src & ~0x20; 236 continue; 237 } 238 239 if (*src == '-') { 240 *dst = '_'; 241 continue; 242 } 243 244 *dst = *src; 245 } 246 247 *dst = 0; 248 249 return NXT_OK; 250} 251 252 253nxt_int_t 254nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) 255{ 256 size_t length; 257 nxt_buf_t *buf; 258 259 do { 260 buf = msg->buf; 261 262 if (nxt_slow_path(buf == NULL)) { 263 return NXT_DONE; 264 } 265 266 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { 267 if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { 268 msg->buf = buf->next; 269 continue; 270 } 271 return NXT_ERROR; 272 } 273 274 if (buf->mem.pos[0] >= 128) { 275 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { 276 return NXT_ERROR; 277 } 278 } 279 280 break; 281 } while (1); 282 283 buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, &length); 284 285 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length)) 286 { 287 return NXT_ERROR; 288 } 289 290 if (length > 0) { 291 str->start = buf->mem.pos; 292 str->length = length - 1; 293 294 buf->mem.pos += length; 295 296 nxt_debug(task, "nxt_read_str: %d %*s", (int)length - 1, 297 (int)length - 1, str->start); 298 } else { 299 str->start = NULL; 300 str->length = 0; 301 302 nxt_debug(task, "nxt_read_str: NULL"); 303 } 304 305 return NXT_OK; 306} 307 308 309size_t 310nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst, 311 size_t size) 312{ 313 size_t res, read_size; 314 nxt_buf_t *buf; 315 316 res = 0; 317 318 while (size > 0) { 319 buf = msg->buf; 320 321 if (nxt_slow_path(buf == NULL)) { 322 break; 323 } 324 325 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { 326 msg->buf = buf->next; 327 continue; 328 } 329 330 read_size = nxt_buf_mem_used_size(&buf->mem); 331 read_size = nxt_min(read_size, size); 332 333 dst = nxt_cpymem(dst, buf->mem.pos, read_size); 334 335 size -= read_size; 336 buf->mem.pos += read_size; 337 res += read_size; 338 } 339 340 nxt_debug(task, "nxt_read_raw: %uz", res); 341 342 return res; 343} 344 345 346nxt_int_t 347nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n, 348 nxt_str_t *v) 349{ 350 nxt_int_t rc; 351 352 rc = nxt_app_msg_read_str(task, rmsg, n); 353 if (nxt_slow_path(rc != NXT_OK)) { 354 return rc; 355 } 356 357 rc = nxt_app_msg_read_str(task, rmsg, v); 358 if (nxt_slow_path(rc != NXT_OK)) { 359 return rc; 360 } 361 362 return rc; 363} 364 365 366nxt_int_t 367nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) 368{ 369 nxt_buf_t *buf; 370 371 do { 372 buf = msg->buf; 373 374 if (nxt_slow_path(buf == NULL)) { 375 return NXT_DONE; 376 } 377 378 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { 379 if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { 380 msg->buf = buf->next; 381 continue; 382 } 383 return NXT_ERROR; 384 } 385 386 if (buf->mem.pos[0] >= 128) { 387 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { 388 return NXT_ERROR; 389 } 390 } 391 392 break; 393 } while (1); 394 395 buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); 396 397 nxt_debug(task, "nxt_read_size: %d", (int)*size); 398 399 return NXT_OK; 400} 401 402 403static nxt_int_t 404nxt_app_request_content_length(void *ctx, nxt_http_field_t *field, 405 nxt_log_t *log) 406{ 407 nxt_str_t *v; 408 nxt_app_parse_ctx_t *c; 409 nxt_app_request_header_t *h; 410 411 c = ctx; 412 h = &c->r.header; 413 v = &field->value; 414 415 h->content_length = *v; 416 h->parsed_content_length = nxt_off_t_parse(v->start, v->length); 417 418 return NXT_OK; 419} 420 421 422static nxt_int_t 423nxt_app_request_content_type(void *ctx, nxt_http_field_t *field, 424 nxt_log_t *log) 425{ 426 nxt_app_parse_ctx_t *c; 427 nxt_app_request_header_t *h; 428 429 c = ctx; 430 h = &c->r.header; 431 432 h->content_type = field->value; 433 434 return NXT_OK; 435} 436 437 438static nxt_int_t 439nxt_app_request_cookie(void *ctx, nxt_http_field_t *field, 440 nxt_log_t *log) 441{ 442 nxt_app_parse_ctx_t *c; 443 nxt_app_request_header_t *h; 444 445 c = ctx; 446 h = &c->r.header; 447 448 h->cookie = field->value; 449 450 return NXT_OK; 451} 452 453 454static nxt_int_t 455nxt_app_request_host(void *ctx, nxt_http_field_t *field, 456 nxt_log_t *log) 457{ 458 nxt_app_parse_ctx_t *c; 459 nxt_app_request_header_t *h; 460 461 c = ctx; 462 h = &c->r.header; 463 464 h->host = field->value; 465 466 return NXT_OK; 467} 468 469 470static nxt_http_fields_hash_entry_t nxt_app_request_fields[] = { 471 { nxt_string("Content-Length"), &nxt_app_request_content_length, 0 }, 472 { nxt_string("Content-Type"), &nxt_app_request_content_type, 0 }, 473 { nxt_string("Cookie"), &nxt_app_request_cookie, 0 }, 474 { nxt_string("Host"), &nxt_app_request_host, 0 }, 475 476 { nxt_null_string, NULL, 0 } 477}; 478 479 480nxt_int_t 481nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx) 482{ 483 nxt_int_t rc; 484 485 ctx->mem_pool = nxt_mp_create(1024, 128, 256, 32); 486 487 rc = nxt_http_parse_request_init(&ctx->parser, ctx->mem_pool); 488 if (nxt_slow_path(rc != NXT_OK)) { 489 return rc; 490 } 491 492 ctx->parser.fields_hash = nxt_app_request_fields_hash; 493 494 return NXT_OK; 495} 496 497 498nxt_int_t 499nxt_app_http_req_header_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, 500 nxt_buf_t *buf) 501{ 502 nxt_int_t rc; 503 nxt_app_request_body_t *b; 504 nxt_http_request_parse_t *p; 505 nxt_app_request_header_t *h; 506 507 p = &ctx->parser; 508 b = &ctx->r.body; 509 h = &ctx->r.header; 510 511 nxt_assert(h->done == 0); 512 513 rc = nxt_http_parse_request(p, &buf->mem); 514 515 if (nxt_slow_path(rc != NXT_DONE)) { 516 return rc; 517 } 518 519 rc = nxt_http_fields_process(p->fields, ctx, task->log); 520 521 if (nxt_slow_path(rc != NXT_OK)) { 522 return rc; 523 } 524 525 h->fields = p->fields; 526 h->done = 1; 527 528 h->version.start = p->version.str; 529 h->version.length = nxt_strlen(p->version.str); 530 531 h->method = p->method; 532 533 h->target.start = p->target_start; 534 h->target.length = p->target_end - p->target_start; 535 536 h->path = p->path; 537 h->query = p->args; 538 539 if (h->parsed_content_length == 0) { 540 b->done = 1; 541 542 } 543 544 if (buf->mem.free == buf->mem.pos) { 545 return NXT_DONE; 546 } 547 548 b->buf = buf; 549 b->done = nxt_buf_mem_used_size(&buf->mem) >= 550 h->parsed_content_length; 551 552 if (b->done == 1) { 553 b->preread_size = nxt_buf_mem_used_size(&buf->mem); 554 } 555 556 return NXT_DONE; 557} 558 559 560nxt_int_t 561nxt_app_http_req_body_read(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, 562 nxt_buf_t *buf) 563{ 564 nxt_app_request_body_t *b; 565 nxt_app_request_header_t *h; 566 567 b = &ctx->r.body; 568 h = &ctx->r.header; 569 570 nxt_assert(h->done == 1); 571 nxt_assert(b->done == 0); 572 573 b->done = nxt_buf_mem_used_size(&buf->mem) + b->preread_size >= 574 (size_t) h->parsed_content_length; 575 576 if (b->done == 1) { 577 b->preread_size += nxt_buf_mem_used_size(&buf->mem); 578 } 579 580 return b->done == 1 ? NXT_DONE : NXT_AGAIN; 581} 582 583 584nxt_int_t 585nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx) 586{ 587 nxt_mp_destroy(ctx->mem_pool); 588 589 return NXT_OK; 590} 591 592 593nxt_int_t 594nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last) 595{ 596 nxt_int_t rc; 597 nxt_buf_t *b; 598 nxt_port_t *port; 599 600 rc = NXT_OK; 601 602 port = nxt_app_msg_get_port(task, msg); 603 if (nxt_slow_path(port == NULL)) { 604 return NXT_ERROR; 605 } 606 607 if (nxt_slow_path(last == 1)) { 608 do { 609 b = *msg->buf; 610 611 if (b == NULL) { 612 b = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); 613 *msg->buf = b; 614 break; 615 } 616 617 msg->buf = &b->next; 618 } while(1); 619 } 620 621 if (nxt_slow_path(msg->write != NULL)) { 622 rc = nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, 623 -1, msg->stream, 0, msg->write); 624 625 msg->write = NULL; 626 msg->buf = &msg->write; 627 } 628 629 return rc; 630} 631 632 633nxt_int_t 634nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, 635 size_t size) 636{ 637 size_t free_size, copy_size; 638 nxt_buf_t *b; 639 nxt_port_t *port; 640 641 nxt_debug(task, "nxt_app_msg_write_raw: %uz", size); 642 643 while (size > 0) { 644 b = *msg->buf; 645 646 if (b == NULL) { 647 port = nxt_app_msg_get_port(task, msg); 648 if (nxt_slow_path(port == NULL)) { 649 return NXT_ERROR; 650 } 651 652 b = nxt_port_mmap_get_buf(task, port, size); 653 if (nxt_slow_path(b == NULL)) { 654 return NXT_ERROR; 655 } 656 657 *msg->buf = b; 658 } 659 660 do { 661 free_size = nxt_buf_mem_free_size(&b->mem); 662 663 if (free_size > 0) { 664 copy_size = nxt_min(free_size, size); 665 666 b->mem.free = nxt_cpymem(b->mem.free, c, copy_size); 667 668 size -= copy_size; 669 c += copy_size; 670 671 if (size == 0) { 672 return NXT_OK; 673 } 674 } 675 } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK); 676 677 msg->buf = &b->next; 678 } 679 680 return NXT_OK; 681} 682 683
| 315nxt_int_t 316nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt) 317{ 318 nxt_http_fields_hash_t *hash; 319 320 hash = nxt_http_fields_hash_create(nxt_app_request_fields, rt->mem_pool); 321 if (nxt_slow_path(hash == NULL)) { 322 return NXT_ERROR; 323 } 324 325 nxt_app_request_fields_hash = hash; 326 327 return NXT_OK; 328} 329 330 331void 332nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 333{ 334 size_t dump_size; 335 nxt_buf_t *b; 336 nxt_port_t *port; 337 nxt_app_rmsg_t rmsg = { msg->buf }; 338 nxt_app_wmsg_t wmsg; 339 340 b = msg->buf; 341 dump_size = b->mem.free - b->mem.pos; 342 343 if (dump_size > 300) { 344 dump_size = 300; 345 } 346 347 nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos); 348 349 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 350 msg->port_msg.reply_port); 351 if (nxt_slow_path(port == NULL)) { 352 // 353 } 354 355 wmsg.port = port; 356 wmsg.write = NULL; 357 wmsg.buf = &wmsg.write; 358 wmsg.stream = msg->port_msg.stream; 359 360 nxt_app->run(task, &rmsg, &wmsg); 361} 362 363 364nxt_inline nxt_port_t * 365nxt_app_msg_get_port(nxt_task_t *task, nxt_app_wmsg_t *msg) 366{ 367 return msg->port; 368} 369 370 371u_char * 372nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size) 373{ 374 size_t free_size; 375 u_char *res; 376 nxt_buf_t *b; 377 nxt_port_t *port; 378 379 res = NULL; 380 381 do { 382 b = *msg->buf; 383 384 if (b == NULL) { 385 port = nxt_app_msg_get_port(task, msg); 386 if (nxt_slow_path(port == NULL)) { 387 return NULL; 388 } 389 390 b = nxt_port_mmap_get_buf(task, port, size); 391 if (nxt_slow_path(b == NULL)) { 392 return NULL; 393 } 394 395 *msg->buf = b; 396 397 free_size = nxt_buf_mem_free_size(&b->mem); 398 399 if (nxt_slow_path(free_size < size)) { 400 nxt_debug(task, "requested buffer too big (%z < %z)", 401 free_size, size); 402 return NULL; 403 } 404 405 } 406 407 free_size = nxt_buf_mem_free_size(&b->mem); 408 409 if (free_size >= size) { 410 res = b->mem.free; 411 b->mem.free += size; 412 413 return res; 414 } 415 416 if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) { 417 res = b->mem.free; 418 b->mem.free += size; 419 420 return res; 421 } 422 423 msg->buf = &b->next; 424 } while(1); 425} 426 427 428nxt_int_t 429nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size) 430{ 431 u_char *dst; 432 size_t dst_length; 433 434 if (c != NULL) { 435 dst_length = size + (size < 128 ? 1 : 4) + 1; 436 437 dst = nxt_app_msg_write_get_buf(task, msg, dst_length); 438 if (nxt_slow_path(dst == NULL)) { 439 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed", 440 dst_length); 441 return NXT_ERROR; 442 } 443 444 dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */ 445 446 nxt_memcpy(dst, c, size); 447 dst[size] = 0; 448 449 nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, (int)size, c); 450 } else { 451 dst_length = 1; 452 453 dst = nxt_app_msg_write_get_buf(task, msg, dst_length); 454 if (nxt_slow_path(dst == NULL)) { 455 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed", 456 dst_length); 457 return NXT_ERROR; 458 } 459 460 dst = nxt_app_msg_write_length(dst, 0); 461 462 nxt_debug(task, "nxt_app_msg_write: NULL"); 463 } 464 465 return NXT_OK; 466} 467 468 469nxt_int_t 470nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg, 471 const nxt_str_t *prefix, const nxt_str_t *v) 472{ 473 u_char *dst, *src; 474 size_t i, length, dst_length; 475 476 length = prefix->length + v->length; 477 478 dst_length = length + (length < 128 ? 1 : 4) + 1; 479 480 dst = nxt_app_msg_write_get_buf(task, msg, dst_length); 481 if (nxt_slow_path(dst == NULL)) { 482 return NXT_ERROR; 483 } 484 485 dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */ 486 487 nxt_memcpy(dst, prefix->start, prefix->length); 488 dst += prefix->length; 489 490 src = v->start; 491 for (i = 0; i < v->length; i++, dst++, src++) { 492 493 if (*src >= 'a' && *src <= 'z') { 494 *dst = *src & ~0x20; 495 continue; 496 } 497 498 if (*src == '-') { 499 *dst = '_'; 500 continue; 501 } 502 503 *dst = *src; 504 } 505 506 *dst = 0; 507 508 return NXT_OK; 509} 510 511 512nxt_int_t 513nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) 514{ 515 size_t length; 516 nxt_buf_t *buf; 517 518 do { 519 buf = msg->buf; 520 521 if (nxt_slow_path(buf == NULL)) { 522 return NXT_DONE; 523 } 524 525 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { 526 if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { 527 msg->buf = buf->next; 528 continue; 529 } 530 return NXT_ERROR; 531 } 532 533 if (buf->mem.pos[0] >= 128) { 534 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { 535 return NXT_ERROR; 536 } 537 } 538 539 break; 540 } while (1); 541 542 buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, &length); 543 544 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length)) 545 { 546 return NXT_ERROR; 547 } 548 549 if (length > 0) { 550 str->start = buf->mem.pos; 551 str->length = length - 1; 552 553 buf->mem.pos += length; 554 555 nxt_debug(task, "nxt_read_str: %d %*s", (int)length - 1, 556 (int)length - 1, str->start); 557 } else { 558 str->start = NULL; 559 str->length = 0; 560 561 nxt_debug(task, "nxt_read_str: NULL"); 562 } 563 564 return NXT_OK; 565} 566 567 568size_t 569nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst, 570 size_t size) 571{ 572 size_t res, read_size; 573 nxt_buf_t *buf; 574 575 res = 0; 576 577 while (size > 0) { 578 buf = msg->buf; 579 580 if (nxt_slow_path(buf == NULL)) { 581 break; 582 } 583 584 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { 585 msg->buf = buf->next; 586 continue; 587 } 588 589 read_size = nxt_buf_mem_used_size(&buf->mem); 590 read_size = nxt_min(read_size, size); 591 592 dst = nxt_cpymem(dst, buf->mem.pos, read_size); 593 594 size -= read_size; 595 buf->mem.pos += read_size; 596 res += read_size; 597 } 598 599 nxt_debug(task, "nxt_read_raw: %uz", res); 600 601 return res; 602} 603 604 605nxt_int_t 606nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n, 607 nxt_str_t *v) 608{ 609 nxt_int_t rc; 610 611 rc = nxt_app_msg_read_str(task, rmsg, n); 612 if (nxt_slow_path(rc != NXT_OK)) { 613 return rc; 614 } 615 616 rc = nxt_app_msg_read_str(task, rmsg, v); 617 if (nxt_slow_path(rc != NXT_OK)) { 618 return rc; 619 } 620 621 return rc; 622} 623 624 625nxt_int_t 626nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) 627{ 628 nxt_buf_t *buf; 629 630 do { 631 buf = msg->buf; 632 633 if (nxt_slow_path(buf == NULL)) { 634 return NXT_DONE; 635 } 636 637 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { 638 if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { 639 msg->buf = buf->next; 640 continue; 641 } 642 return NXT_ERROR; 643 } 644 645 if (buf->mem.pos[0] >= 128) { 646 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { 647 return NXT_ERROR; 648 } 649 } 650 651 break; 652 } while (1); 653 654 buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); 655 656 nxt_debug(task, "nxt_read_size: %d", (int)*size); 657 658 return NXT_OK; 659} 660 661 662static nxt_int_t 663nxt_app_request_content_length(void *ctx, nxt_http_field_t *field, 664 nxt_log_t *log) 665{ 666 nxt_str_t *v; 667 nxt_app_parse_ctx_t *c; 668 nxt_app_request_header_t *h; 669 670 c = ctx; 671 h = &c->r.header; 672 v = &field->value; 673 674 h->content_length = *v; 675 h->parsed_content_length = nxt_off_t_parse(v->start, v->length); 676 677 return NXT_OK; 678} 679 680 681static nxt_int_t 682nxt_app_request_content_type(void *ctx, nxt_http_field_t *field, 683 nxt_log_t *log) 684{ 685 nxt_app_parse_ctx_t *c; 686 nxt_app_request_header_t *h; 687 688 c = ctx; 689 h = &c->r.header; 690 691 h->content_type = field->value; 692 693 return NXT_OK; 694} 695 696 697static nxt_int_t 698nxt_app_request_cookie(void *ctx, nxt_http_field_t *field, 699 nxt_log_t *log) 700{ 701 nxt_app_parse_ctx_t *c; 702 nxt_app_request_header_t *h; 703 704 c = ctx; 705 h = &c->r.header; 706 707 h->cookie = field->value; 708 709 return NXT_OK; 710} 711 712 713static nxt_int_t 714nxt_app_request_host(void *ctx, nxt_http_field_t *field, 715 nxt_log_t *log) 716{ 717 nxt_app_parse_ctx_t *c; 718 nxt_app_request_header_t *h; 719 720 c = ctx; 721 h = &c->r.header; 722 723 h->host = field->value; 724 725 return NXT_OK; 726} 727 728 729static nxt_http_fields_hash_entry_t nxt_app_request_fields[] = { 730 { nxt_string("Content-Length"), &nxt_app_request_content_length, 0 }, 731 { nxt_string("Content-Type"), &nxt_app_request_content_type, 0 }, 732 { nxt_string("Cookie"), &nxt_app_request_cookie, 0 }, 733 { nxt_string("Host"), &nxt_app_request_host, 0 }, 734 735 { nxt_null_string, NULL, 0 } 736}; 737 738 739nxt_int_t 740nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx) 741{ 742 nxt_int_t rc; 743 744 ctx->mem_pool = nxt_mp_create(1024, 128, 256, 32); 745 746 rc = nxt_http_parse_request_init(&ctx->parser, ctx->mem_pool); 747 if (nxt_slow_path(rc != NXT_OK)) { 748 return rc; 749 } 750 751 ctx->parser.fields_hash = nxt_app_request_fields_hash; 752 753 return NXT_OK; 754} 755 756 757nxt_int_t 758nxt_app_http_req_header_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, 759 nxt_buf_t *buf) 760{ 761 nxt_int_t rc; 762 nxt_app_request_body_t *b; 763 nxt_http_request_parse_t *p; 764 nxt_app_request_header_t *h; 765 766 p = &ctx->parser; 767 b = &ctx->r.body; 768 h = &ctx->r.header; 769 770 nxt_assert(h->done == 0); 771 772 rc = nxt_http_parse_request(p, &buf->mem); 773 774 if (nxt_slow_path(rc != NXT_DONE)) { 775 return rc; 776 } 777 778 rc = nxt_http_fields_process(p->fields, ctx, task->log); 779 780 if (nxt_slow_path(rc != NXT_OK)) { 781 return rc; 782 } 783 784 h->fields = p->fields; 785 h->done = 1; 786 787 h->version.start = p->version.str; 788 h->version.length = nxt_strlen(p->version.str); 789 790 h->method = p->method; 791 792 h->target.start = p->target_start; 793 h->target.length = p->target_end - p->target_start; 794 795 h->path = p->path; 796 h->query = p->args; 797 798 if (h->parsed_content_length == 0) { 799 b->done = 1; 800 801 } 802 803 if (buf->mem.free == buf->mem.pos) { 804 return NXT_DONE; 805 } 806 807 b->buf = buf; 808 b->done = nxt_buf_mem_used_size(&buf->mem) >= 809 h->parsed_content_length; 810 811 if (b->done == 1) { 812 b->preread_size = nxt_buf_mem_used_size(&buf->mem); 813 } 814 815 return NXT_DONE; 816} 817 818 819nxt_int_t 820nxt_app_http_req_body_read(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, 821 nxt_buf_t *buf) 822{ 823 nxt_app_request_body_t *b; 824 nxt_app_request_header_t *h; 825 826 b = &ctx->r.body; 827 h = &ctx->r.header; 828 829 nxt_assert(h->done == 1); 830 nxt_assert(b->done == 0); 831 832 b->done = nxt_buf_mem_used_size(&buf->mem) + b->preread_size >= 833 (size_t) h->parsed_content_length; 834 835 if (b->done == 1) { 836 b->preread_size += nxt_buf_mem_used_size(&buf->mem); 837 } 838 839 return b->done == 1 ? NXT_DONE : NXT_AGAIN; 840} 841 842 843nxt_int_t 844nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx) 845{ 846 nxt_mp_destroy(ctx->mem_pool); 847 848 return NXT_OK; 849} 850 851 852nxt_int_t 853nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last) 854{ 855 nxt_int_t rc; 856 nxt_buf_t *b; 857 nxt_port_t *port; 858 859 rc = NXT_OK; 860 861 port = nxt_app_msg_get_port(task, msg); 862 if (nxt_slow_path(port == NULL)) { 863 return NXT_ERROR; 864 } 865 866 if (nxt_slow_path(last == 1)) { 867 do { 868 b = *msg->buf; 869 870 if (b == NULL) { 871 b = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); 872 *msg->buf = b; 873 break; 874 } 875 876 msg->buf = &b->next; 877 } while(1); 878 } 879 880 if (nxt_slow_path(msg->write != NULL)) { 881 rc = nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, 882 -1, msg->stream, 0, msg->write); 883 884 msg->write = NULL; 885 msg->buf = &msg->write; 886 } 887 888 return rc; 889} 890 891 892nxt_int_t 893nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, 894 size_t size) 895{ 896 size_t free_size, copy_size; 897 nxt_buf_t *b; 898 nxt_port_t *port; 899 900 nxt_debug(task, "nxt_app_msg_write_raw: %uz", size); 901 902 while (size > 0) { 903 b = *msg->buf; 904 905 if (b == NULL) { 906 port = nxt_app_msg_get_port(task, msg); 907 if (nxt_slow_path(port == NULL)) { 908 return NXT_ERROR; 909 } 910 911 b = nxt_port_mmap_get_buf(task, port, size); 912 if (nxt_slow_path(b == NULL)) { 913 return NXT_ERROR; 914 } 915 916 *msg->buf = b; 917 } 918 919 do { 920 free_size = nxt_buf_mem_free_size(&b->mem); 921 922 if (free_size > 0) { 923 copy_size = nxt_min(free_size, size); 924 925 b->mem.free = nxt_cpymem(b->mem.free, c, copy_size); 926 927 size -= copy_size; 928 c += copy_size; 929 930 if (size == 0) { 931 return NXT_OK; 932 } 933 } 934 } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK); 935 936 msg->buf = &b->next; 937 } 938 939 return NXT_OK; 940} 941 942
|