9a10 > #include 38,44c39,44 < uint32_t stream; < nxt_conn_t *conn; < nxt_app_t *app; < nxt_port_t *app_port; < nxt_app_parse_ctx_t *ap; < nxt_msg_info_t msg_info; < nxt_req_app_link_t *ra; --- > uint32_t stream; > nxt_app_t *app; > nxt_port_t *app_port; > nxt_app_parse_ctx_t *ap; > nxt_msg_info_t msg_info; > nxt_req_app_link_t *ra; 46c46 < nxt_queue_link_t link; /* for nxt_conn_t.requests */ --- > nxt_queue_link_t link; /* for nxt_conn_t.requests */ 202,209d201 < static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); < static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, < void *data); < static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c); < static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, < void *data); < static void nxt_router_process_http_request(nxt_task_t *task, < nxt_conn_t *c, nxt_app_parse_ctx_t *ap); 218,219d209 < static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); < static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); 221,222d210 < static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); < static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); 224d211 < static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); 226,227c213,214 < static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, < const char* str); --- > static const nxt_http_request_state_t nxt_http_request_send_state; > static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); 248c235 < ret = nxt_app_http_init(task, rt); --- > ret = nxt_http_init(task, rt); 505,507c492,493 < nxt_mp_t *mp; < nxt_conn_t *c; < nxt_req_conn_link_t *rc; --- > nxt_mp_t *mp; > nxt_req_conn_link_t *rc; 517,518d502 < c = rc->conn; < 520c504 < nxt_router_gen_error(task, c, ra->err_code, ra->err_str); --- > nxt_http_request_error(task, rc->ap->request, ra->err_code); 527,528c511,512 < c->read_timer.handler = nxt_router_app_timeout; < nxt_timer_add(task->thread->engine, &c->read_timer, --- > rc->ap->timer.handler = nxt_router_app_timeout; > nxt_timer_add(task->thread->engine, &rc->ap->timer, 696,699d679 < < nxt_queue_remove(&rc->link); < < rc->conn = NULL; 1081a1062,1067 > nxt_string("idle_timeout"), > NXT_CONF_MAP_MSEC, > offsetof(nxt_socket_conf_t, idle_timeout), > }, > > { 1091a1078,1083 > > { > nxt_string("send_timeout"), > NXT_CONF_MAP_MSEC, > offsetof(nxt_socket_conf_t, send_timeout), > }, 1298a1291 > skcf->idle_timeout = 65000; 1300a1294 > skcf->send_timeout = 5000; 1311c1305 < skcf->listen->handler = nxt_router_conn_init; --- > skcf->listen->handler = nxt_http_conn_init; 2380,2406d2373 < static const nxt_conn_state_t nxt_router_conn_read_header_state < nxt_aligned(64) = < { < .ready_handler = nxt_router_conn_http_header_parse, < .close_handler = nxt_router_conn_close, < .error_handler = nxt_router_conn_error, < < .timer_handler = nxt_router_conn_timeout, < .timer_value = nxt_router_conn_timeout_value, < .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), < }; < < < static const nxt_conn_state_t nxt_router_conn_read_body_state < nxt_aligned(64) = < { < .ready_handler = nxt_router_conn_http_body_read, < .close_handler = nxt_router_conn_close, < .error_handler = nxt_router_conn_error, < < .timer_handler = nxt_router_conn_timeout, < .timer_value = nxt_router_conn_timeout_value, < .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout), < .timer_autoreset = 1, < }; < < 2408,2451d2374 < nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) < { < size_t size; < nxt_conn_t *c; < nxt_socket_conf_t *skcf; < nxt_event_engine_t *engine; < nxt_socket_conf_joint_t *joint; < < c = obj; < joint = data; < < nxt_debug(task, "router conn init"); < < c->joint = joint; < joint->count++; < < skcf = joint->socket_conf; < c->local = skcf->sockaddr; < < size = skcf->header_buffer_size; < c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0); < < c->socket.data = NULL; < < engine = task->thread->engine; < c->read_work_queue = &engine->fast_work_queue; < c->write_work_queue = &engine->fast_work_queue; < < c->read_state = &nxt_router_conn_read_header_state; < < nxt_conn_read(engine, c); < } < < < static const nxt_conn_state_t nxt_router_conn_write_state < nxt_aligned(64) = < { < .ready_handler = nxt_router_conn_ready, < .close_handler = nxt_router_conn_close, < .error_handler = nxt_router_conn_error, < }; < < < static void 2455a2379 > nxt_int_t ret; 2457,2458c2381 < nxt_conn_t *c; < nxt_event_engine_t *engine; --- > nxt_http_request_t *r; 2459a2383 > nxt_app_parse_ctx_t *ar; 2464,2465d2387 < c = rc->conn; < 2480c2402 < engine = task->thread->engine; --- > ar = rc->ap; 2482,2483d2403 < nxt_timer_disable(engine, &c->read_timer); < 2487c2407 < last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); --- > last = nxt_http_request_last_buffer(task, ar->request); 2489c2409,2411 < /* TODO pogorevaTb */ --- > nxt_app_http_req_done(task, ar); > nxt_router_rc_unlink(task, rc); > return; 2498,2499c2420,2421 < c->read_timer.handler = nxt_router_app_timeout; < nxt_timer_add(engine, &c->read_timer, rc->app->timeout); --- > ar->timer.handler = nxt_router_app_timeout; > nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout); 2512,2514c2434 < if (c->write == NULL) { < c->write = b; < c->write_state = &nxt_router_conn_write_state; --- > r = ar->request; 2516c2436,2438 < nxt_conn_write(task->thread->engine, c); --- > if (r->header_sent) { > nxt_buf_chain_add(&r->out, b); > nxt_http_request_send_body(task, r, NULL); 2519c2441,2444 < nxt_debug(task, "router data attach out bufs to existing chain"); --- > ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem); > if (nxt_slow_path(ret != NXT_DONE)) { > goto fail; > } 2521c2446,2460 < nxt_buf_chain_add(&c->write, b); --- > r->resp.fields = ar->resp_parser.fields; > > ret = nxt_http_fields_process(r->resp.fields, > &nxt_response_fields_hash, r); > if (nxt_slow_path(ret != NXT_OK)) { > goto fail; > } > > if (nxt_buf_mem_used_size(&b->mem) != 0) { > nxt_buf_chain_add(&r->out, b); > } > > r->state = &nxt_http_request_send_state; > > nxt_http_request_header_send(task, r); 2522a2462,2470 > > return; > > fail: > > nxt_app_http_req_done(task, ar); > nxt_router_rc_unlink(task, rc); > > nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 2525a2474,2481 > static const nxt_http_request_state_t nxt_http_request_send_state > nxt_aligned(64) = > { > .ready_handler = nxt_http_request_send_body, > .error_handler = nxt_http_request_close_handler, > }; > > 2526a2483,2499 > nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) > { > nxt_buf_t *out; > nxt_http_request_t *r; > > r = obj; > > out = r->out; > > if (out != NULL) { > r->out = NULL; > nxt_http_request_send(task, r, out); > } > } > > > static void 2565,2566c2538 < nxt_router_gen_error(task, rc->conn, 500, < "Application terminated unexpectedly"); --- > nxt_http_request_error(task, rc->ap->request, NXT_HTTP_SERVICE_UNAVAILABLE); 2572,2620d2543 < nxt_inline const char * < nxt_router_text_by_code(int code) < { < switch (code) { < case 400: return "Bad request"; < case 404: return "Not found"; < case 403: return "Forbidden"; < case 408: return "Request Timeout"; < case 411: return "Length Required"; < case 413: return "Request Entity Too Large"; < case 500: < default: return "Internal server error"; < } < } < < < static nxt_buf_t * < nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, < const char* str) < { < nxt_buf_t *b, *last; < < b = nxt_buf_mem_alloc(mp, 16384, 0); < if (nxt_slow_path(b == NULL)) { < return NULL; < } < < b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, < "HTTP/1.0 %d %s\r\n" < "Content-Type: text/plain\r\n" < "Connection: close\r\n\r\n", < code, nxt_router_text_by_code(code)); < < b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str)); < < last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST); < < if (nxt_slow_path(last == NULL)) { < nxt_mp_free(mp, b); < return NULL; < } < < nxt_buf_chain_add(&b, last); < < return b; < } < < < 2622,2657d2544 < nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, < const char* str) < { < nxt_mp_t *mp; < nxt_buf_t *b; < < /* TODO: fix when called in the middle of response */ < < nxt_log_alert(task->log, "error %d: %s", code, str); < < if (c->socket.fd == -1) { < return; < } < < mp = c->mem_pool; < < b = nxt_router_get_error_buf(task, mp, code, str); < if (nxt_slow_path(b == NULL)) { < return; < } < < if (c->write == NULL) { < c->write = b; < c->write_state = &nxt_router_conn_write_state; < < nxt_conn_write(task->thread->engine, c); < < } else { < nxt_debug(task, "router data attach out bufs to existing chain"); < < nxt_buf_chain_add(&c->write, b); < } < } < < < static void 3231,3232c3118,3119 < static void < nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) --- > void > nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar) 3234,3242c3121,3127 < size_t size; < nxt_int_t ret; < nxt_buf_t *buf; < nxt_conn_t *c; < nxt_sockaddr_t *local; < nxt_app_parse_ctx_t *ap; < nxt_app_request_body_t *b; < nxt_socket_conf_joint_t *joint; < nxt_app_request_header_t *h; --- > nxt_int_t res; > nxt_app_t *app; > nxt_port_t *port; > nxt_event_engine_t *engine; > nxt_http_request_t *r; > nxt_req_app_link_t ra_local, *ra; > nxt_req_conn_link_t *rc; 3244,3247c3129,3130 < c = obj; < ap = data; < buf = c->read; < joint = c->joint; --- > r = ar->request; > app = r->socket_conf->application; 3249,3504d3131 < nxt_debug(task, "router conn http header parse"); < < if (ap == NULL) { < ap = nxt_app_http_req_init(task); < if (nxt_slow_path(ap == NULL)) { < nxt_router_gen_error(task, c, 500, < "Failed to allocate parse context"); < return; < } < < c->socket.data = ap; < < ap->r.remote.start = nxt_sockaddr_address(c->remote); < ap->r.remote.length = c->remote->address_length; < < /* < * TODO: need an application flag to get local address < * required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go. < */ < local = nxt_router_local_addr(task, c); < < if (nxt_fast_path(local != NULL)) { < ap->r.local.start = nxt_sockaddr_address(local); < ap->r.local.length = local->address_length; < } < < ap->r.header.buf = buf; < } < < h = &ap->r.header; < b = &ap->r.body; < < ret = nxt_app_http_req_header_parse(task, ap, buf); < < nxt_debug(task, "http parse request header: %d", ret); < < switch (nxt_expect(NXT_DONE, ret)) { < < case NXT_DONE: < nxt_debug(task, "router request header parsing complete, " < "content length: %O, preread: %uz", < h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem)); < < if (b->done) { < nxt_router_process_http_request(task, c, ap); < < return; < } < < if (joint->socket_conf->max_body_size > 0 < && (size_t) h->parsed_content_length < > joint->socket_conf->max_body_size) < { < nxt_router_gen_error(task, c, 413, "Content-Length too big"); < return; < } < < if (nxt_buf_mem_free_size(&buf->mem) == 0) { < size = nxt_min(joint->socket_conf->body_buffer_size, < (size_t) h->parsed_content_length); < < buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); < if (nxt_slow_path(buf->next == NULL)) { < nxt_router_gen_error(task, c, 500, "Failed to allocate " < "buffer for request body"); < return; < } < < c->read = buf->next; < < b->preread_size += nxt_buf_mem_used_size(&buf->mem); < } < < if (b->buf == NULL) { < b->buf = c->read; < } < < c->read_state = &nxt_router_conn_read_body_state; < break; < < case NXT_ERROR: < nxt_router_gen_error(task, c, 400, "Request header parse error"); < return; < < default: /* NXT_AGAIN */ < < if (c->read->mem.free == c->read->mem.end) { < size = joint->socket_conf->large_header_buffer_size; < < if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) < || ap->r.header.bufs < >= joint->socket_conf->large_header_buffers) < { < nxt_router_gen_error(task, c, 413, < "Too long request headers"); < return; < } < < buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); < if (nxt_slow_path(buf->next == NULL)) { < nxt_router_gen_error(task, c, 500, < "Failed to allocate large header " < "buffer"); < return; < } < < ap->r.header.bufs++; < < size = c->read->mem.free - c->read->mem.pos; < < c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size); < } < < } < < nxt_conn_read(task->thread->engine, c); < } < < < static nxt_sockaddr_t * < nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c) < { < int ret; < size_t size, length; < socklen_t socklen; < nxt_sockaddr_t *sa; < < if (c->local != NULL) { < return c->local; < } < < /* AF_UNIX should not get in here. */ < < switch (c->remote->u.sockaddr.sa_family) { < #if (NXT_INET6) < case AF_INET6: < socklen = sizeof(struct sockaddr_in6); < length = NXT_INET6_ADDR_STR_LEN; < size = offsetof(nxt_sockaddr_t, u) + socklen + length; < break; < #endif < case AF_INET: < default: < socklen = sizeof(struct sockaddr_in); < length = NXT_INET_ADDR_STR_LEN; < size = offsetof(nxt_sockaddr_t, u) + socklen + length; < break; < } < < sa = nxt_mp_get(c->mem_pool, size); < if (nxt_slow_path(sa == NULL)) { < return NULL; < } < < sa->socklen = socklen; < sa->length = length; < < ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen); < if (nxt_slow_path(ret != 0)) { < nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd); < return NULL; < } < < c->local = sa; < < nxt_sockaddr_text(sa); < < /* < * TODO: here we can adjust the end of non-freeable block < * in c->mem_pool to the end of actual sockaddr length. < */ < < return sa; < } < < < static void < nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data) < { < size_t size; < nxt_int_t ret; < nxt_buf_t *buf; < nxt_conn_t *c; < nxt_app_parse_ctx_t *ap; < nxt_app_request_body_t *b; < nxt_socket_conf_joint_t *joint; < nxt_app_request_header_t *h; < < c = obj; < ap = data; < buf = c->read; < < nxt_debug(task, "router conn http body read"); < < nxt_assert(ap != NULL); < < b = &ap->r.body; < h = &ap->r.header; < < ret = nxt_app_http_req_body_read(task, ap, buf); < < nxt_debug(task, "http read request body: %d", ret); < < switch (nxt_expect(NXT_DONE, ret)) { < < case NXT_DONE: < nxt_router_process_http_request(task, c, ap); < return; < < case NXT_ERROR: < nxt_router_gen_error(task, c, 500, "Read body error"); < return; < < default: /* NXT_AGAIN */ < < if (nxt_buf_mem_free_size(&buf->mem) == 0) { < joint = c->joint; < < b->preread_size += nxt_buf_mem_used_size(&buf->mem); < < size = nxt_min(joint->socket_conf->body_buffer_size, < (size_t) h->parsed_content_length - b->preread_size); < < buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); < if (nxt_slow_path(buf->next == NULL)) { < nxt_router_gen_error(task, c, 500, "Failed to allocate " < "buffer for request body"); < return; < } < < c->read = buf->next; < } < < nxt_debug(task, "router request body read again, rest: %uz", < h->parsed_content_length - b->preread_size); < } < < nxt_conn_read(task->thread->engine, c); < } < < < static void < nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, < nxt_app_parse_ctx_t *ap) < { < nxt_int_t res; < nxt_app_t *app; < nxt_port_t *port; < nxt_event_engine_t *engine; < nxt_req_app_link_t ra_local, *ra; < nxt_req_conn_link_t *rc; < nxt_socket_conf_joint_t *joint; < < joint = c->joint; < app = joint->socket_conf->application; < 3506,3507c3133 < nxt_router_gen_error(task, c, 500, < "Application is NULL in socket_conf"); --- > nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 3519,3521c3145 < nxt_router_gen_error(task, c, 500, "Failed to allocate " < "req<->conn link"); < --- > nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 3526d3149 < rc->conn = c; 3531c3154 < nxt_timer_disable(engine, &c->read_timer); --- > rc->ap = ar; 3533,3540d3155 < nxt_queue_insert_tail(&c->requests, &rc->link); < < nxt_debug(task, "stream #%uD linked to conn %p at engine %p", < rc->stream, c, engine); < < rc->ap = ap; < c->socket.data = NULL; < 3915c3530 < static const nxt_conn_state_t nxt_router_conn_close_state --- > const nxt_conn_state_t nxt_router_conn_close_state 3923,3991d3537 < nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data) < { < nxt_buf_t *b; < nxt_bool_t last; < nxt_conn_t *c; < nxt_work_queue_t *wq; < < nxt_debug(task, "router conn ready %p", obj); < < c = obj; < b = c->write; < < wq = &task->thread->engine->fast_work_queue; < < last = 0; < < while (b != NULL) { < if (!nxt_buf_is_sync(b)) { < if (nxt_buf_used_size(b) > 0) { < break; < } < } < < if (nxt_buf_is_last(b)) { < last = 1; < } < < nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); < < b = b->next; < } < < c->write = b; < < if (b != NULL) { < nxt_debug(task, "router conn %p has more data to write", obj); < < nxt_conn_write(task->thread->engine, c); < < } else { < nxt_debug(task, "router conn %p no more data to write, last = %d", obj, < last); < < if (last != 0) { < nxt_debug(task, "enqueue router conn close %p (ready handler)", c); < < nxt_work_queue_add(wq, nxt_router_conn_close, task, c, < c->socket.data); < } < } < } < < < static void < nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) < { < nxt_conn_t *c; < < c = obj; < < nxt_debug(task, "router conn close"); < < c->write_state = &nxt_router_conn_close_state; < < nxt_conn_close(task->thread->engine, c); < } < < < static void 4007,4008d3552 < nxt_req_conn_link_t *rc; < nxt_app_parse_ctx_t *ap; 4012d3555 < ap = data; 4016,4031d3558 < if (ap != NULL) { < nxt_app_http_req_done(task, ap); < < c->socket.data = NULL; < } < < nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { < < nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream); < < nxt_router_rc_unlink(task, rc); < < nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream); < < } nxt_queue_loop; < 4048,4085d3574 < nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) < { < nxt_conn_t *c; < < c = obj; < < nxt_debug(task, "router conn error"); < < if (c->socket.fd != -1) { < c->write_state = &nxt_router_conn_close_state; < < nxt_conn_close(task->thread->engine, c); < } < } < < < static void < nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) < { < nxt_conn_t *c; < nxt_timer_t *timer; < < timer = obj; < < nxt_debug(task, "router conn timeout"); < < c = nxt_read_timer_conn(timer); < < if (c->read_state == &nxt_router_conn_read_header_state) { < nxt_router_gen_error(task, c, 408, "Read header timeout"); < < } else { < nxt_router_gen_error(task, c, 408, "Read body timeout"); < } < } < < < static void 4088,4089c3577,3578 < nxt_conn_t *c; < nxt_timer_t *timer; --- > nxt_timer_t *timer; > nxt_app_parse_ctx_t *ar; 4095c3584 < c = nxt_read_timer_conn(timer); --- > ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer); 4097c3586,3588 < nxt_router_gen_error(task, c, 408, "Application timeout"); --- > if (!ar->request->header_sent) { > nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE); > } 4099,4109d3589 < < < static nxt_msec_t < nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) < { < nxt_socket_conf_joint_t *joint; < < joint = c->joint; < < return nxt_value_at(nxt_msec_t, joint->socket_conf, data); < }