nxt_controller.c (60:b80bfbd9bddc) nxt_controller.c (62:5e1efcc7b740)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>

--- 20 unchanged lines hidden (view full) ---

29 nxt_str_t status_line;
30 nxt_conf_json_value_t *json_value;
31 nxt_str_t json_string;
32} nxt_controller_response_t;
33
34
35static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
36static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>

--- 20 unchanged lines hidden (view full) ---

29 nxt_str_t status_line;
30 nxt_conf_json_value_t *json_value;
31 nxt_str_t json_string;
32} nxt_controller_response_t;
33
34
35static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
36static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
37static nxt_msec_t nxt_controller_conn_timeout_value(nxt_event_conn_t *c,
37static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
38 uintptr_t data);
39static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
40 void *data);
41static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
42 void *data);
43static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
44 void *data);
45static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
46static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
47 void *data);
48static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
49 void *data);
50static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
51static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
52
53static nxt_int_t nxt_controller_request_content_length(void *ctx,
54 nxt_http_field_t *field, uintptr_t data, nxt_log_t *log);
55
56static void nxt_controller_process_request(nxt_task_t *task,
38 uintptr_t data);
39static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
40 void *data);
41static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
42 void *data);
43static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
44 void *data);
45static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
46static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
47 void *data);
48static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
49 void *data);
50static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
51static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
52
53static nxt_int_t nxt_controller_request_content_length(void *ctx,
54 nxt_http_field_t *field, uintptr_t data, nxt_log_t *log);
55
56static void nxt_controller_process_request(nxt_task_t *task,
57 nxt_event_conn_t *c, nxt_controller_request_t *r);
58static nxt_int_t nxt_controller_response(nxt_task_t *task, nxt_event_conn_t *c,
57 nxt_conn_t *c, nxt_controller_request_t *r);
58static nxt_int_t nxt_controller_response(nxt_task_t *task, nxt_conn_t *c,
59 nxt_controller_response_t *resp);
60static nxt_buf_t *nxt_controller_response_body(nxt_controller_response_t *resp,
61 nxt_mem_pool_t *pool);
62
63
64static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
65 { nxt_string("Content-Length"),
66 &nxt_controller_request_content_length, 0 },

--- 112 unchanged lines hidden (view full) ---

179 ls->handler = nxt_controller_conn_init;
180
181 /*
182 * Connection memory pool chunk size is tunned to
183 * allocate the most data in one mem_pool chunk.
184 */
185 ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls)
186 + sizeof(nxt_event_conn_proxy_t)
59 nxt_controller_response_t *resp);
60static nxt_buf_t *nxt_controller_response_body(nxt_controller_response_t *resp,
61 nxt_mem_pool_t *pool);
62
63
64static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
65 { nxt_string("Content-Length"),
66 &nxt_controller_request_content_length, 0 },

--- 112 unchanged lines hidden (view full) ---

179 ls->handler = nxt_controller_conn_init;
180
181 /*
182 * Connection memory pool chunk size is tunned to
183 * allocate the most data in one mem_pool chunk.
184 */
185 ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls)
186 + sizeof(nxt_event_conn_proxy_t)
187 + sizeof(nxt_event_conn_t)
187 + sizeof(nxt_conn_t)
188 + 4 * sizeof(nxt_buf_t);
189
190 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
191 return NXT_ERROR;
192 }
193
194 rt->controller_socket = ls;
195
196 return NXT_OK;
197}
198
199
200static void
201nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
202{
203 nxt_buf_t *b;
188 + 4 * sizeof(nxt_buf_t);
189
190 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
191 return NXT_ERROR;
192 }
193
194 rt->controller_socket = ls;
195
196 return NXT_OK;
197}
198
199
200static void
201nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
202{
203 nxt_buf_t *b;
204 nxt_event_conn_t *c;
204 nxt_conn_t *c;
205 nxt_event_engine_t *engine;
206 nxt_controller_request_t *r;
207
208 c = obj;
209
210 nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
211
212 r = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_controller_request_t));

--- 19 unchanged lines hidden (view full) ---

232 c->socket.data = r;
233 c->socket.read_ready = 1;
234 c->read_state = &nxt_controller_conn_read_state;
235
236 engine = task->thread->engine;
237 c->read_work_queue = &engine->read_work_queue;
238 c->write_work_queue = &engine->write_work_queue;
239
205 nxt_event_engine_t *engine;
206 nxt_controller_request_t *r;
207
208 c = obj;
209
210 nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
211
212 r = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_controller_request_t));

--- 19 unchanged lines hidden (view full) ---

232 c->socket.data = r;
233 c->socket.read_ready = 1;
234 c->read_state = &nxt_controller_conn_read_state;
235
236 engine = task->thread->engine;
237 c->read_work_queue = &engine->read_work_queue;
238 c->write_work_queue = &engine->write_work_queue;
239
240 nxt_event_conn_read(engine, c);
240 nxt_conn_read(engine, c);
241}
242
243
244static const nxt_event_conn_state_t nxt_controller_conn_read_state
245 nxt_aligned(64) =
246{
247 .ready_handler = nxt_controller_conn_read,
248 .close_handler = nxt_controller_conn_close,

--- 6 unchanged lines hidden (view full) ---

255
256
257static void
258nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
259{
260 size_t preread;
261 nxt_buf_t *b;
262 nxt_int_t rc;
241}
242
243
244static const nxt_event_conn_state_t nxt_controller_conn_read_state
245 nxt_aligned(64) =
246{
247 .ready_handler = nxt_controller_conn_read,
248 .close_handler = nxt_controller_conn_close,

--- 6 unchanged lines hidden (view full) ---

255
256
257static void
258nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
259{
260 size_t preread;
261 nxt_buf_t *b;
262 nxt_int_t rc;
263 nxt_event_conn_t *c;
263 nxt_conn_t *c;
264 nxt_controller_request_t *r;
265
266 c = obj;
267 r = data;
268
269 nxt_debug(task, "controller conn read");
270
271 nxt_queue_remove(&c->link);

--- 7 unchanged lines hidden (view full) ---

279
280 if (rc == NXT_AGAIN) {
281 if (nxt_buf_mem_free_size(&b->mem) == 0) {
282 nxt_log(task, NXT_LOG_ERR, "too long request headers");
283 nxt_controller_conn_close(task, c, r);
284 return;
285 }
286
264 nxt_controller_request_t *r;
265
266 c = obj;
267 r = data;
268
269 nxt_debug(task, "controller conn read");
270
271 nxt_queue_remove(&c->link);

--- 7 unchanged lines hidden (view full) ---

279
280 if (rc == NXT_AGAIN) {
281 if (nxt_buf_mem_free_size(&b->mem) == 0) {
282 nxt_log(task, NXT_LOG_ERR, "too long request headers");
283 nxt_controller_conn_close(task, c, r);
284 return;
285 }
286
287 nxt_event_conn_read(task->thread->engine, c);
287 nxt_conn_read(task->thread->engine, c);
288 return;
289 }
290
291 /* rc == NXT_ERROR */
292
293 nxt_log(task, NXT_LOG_ERR, "parsing error");
294
295 nxt_controller_conn_close(task, c, r);

--- 28 unchanged lines hidden (view full) ---

324
325 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
326
327 c->read = b;
328 }
329
330 c->read_state = &nxt_controller_conn_body_read_state;
331
288 return;
289 }
290
291 /* rc == NXT_ERROR */
292
293 nxt_log(task, NXT_LOG_ERR, "parsing error");
294
295 nxt_controller_conn_close(task, c, r);

--- 28 unchanged lines hidden (view full) ---

324
325 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
326
327 c->read = b;
328 }
329
330 c->read_state = &nxt_controller_conn_body_read_state;
331
332 nxt_event_conn_read(task->thread->engine, c);
332 nxt_conn_read(task->thread->engine, c);
333}
334
335
336static nxt_msec_t
333}
334
335
336static nxt_msec_t
337nxt_controller_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data)
337nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
338{
339 return (nxt_msec_t) data;
340}
341
342
343static void
344nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
345{
338{
339 return (nxt_msec_t) data;
340}
341
342
343static void
344nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
345{
346 nxt_event_conn_t *c;
346 nxt_conn_t *c;
347
348 c = obj;
349
350 nxt_debug(task, "controller conn read error");
351
352 nxt_controller_conn_close(task, c, data);
353}
354
355
356static void
357nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
358{
347
348 c = obj;
349
350 nxt_debug(task, "controller conn read error");
351
352 nxt_controller_conn_close(task, c, data);
353}
354
355
356static void
357nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
358{
359 nxt_timer_t *ev;
360 nxt_event_conn_t *c;
359 nxt_timer_t *timer;
360 nxt_conn_t *c;
361
361
362 ev = obj;
362 timer = obj;
363
363
364 c = nxt_event_read_timer_conn(ev);
364 c = nxt_read_timer_conn(timer);
365 c->socket.timedout = 1;
366 c->socket.closed = 1;
367
368 nxt_debug(task, "controller conn read timeout");
369
370 nxt_controller_conn_close(task, c, data);
371}
372

--- 10 unchanged lines hidden (view full) ---

383 .timer_data = 60 * 1000,
384 .timer_autoreset = 1,
385};
386
387
388static void
389nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
390{
365 c->socket.timedout = 1;
366 c->socket.closed = 1;
367
368 nxt_debug(task, "controller conn read timeout");
369
370 nxt_controller_conn_close(task, c, data);
371}
372

--- 10 unchanged lines hidden (view full) ---

383 .timer_data = 60 * 1000,
384 .timer_autoreset = 1,
385};
386
387
388static void
389nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
390{
391 size_t rest;
392 nxt_buf_t *b;
393 nxt_event_conn_t *c;
391 size_t rest;
392 nxt_buf_t *b;
393 nxt_conn_t *c;
394
395 c = obj;
396
397 nxt_debug(task, "controller conn body read");
398
399 b = c->read;
400
401 rest = nxt_buf_mem_free_size(&b->mem);
402
403 if (rest == 0) {
404 nxt_debug(task, "controller conn body read complete");
405
406 nxt_controller_process_request(task, c, data);
407 return;
408 }
409
410 nxt_debug(task, "controller conn body read again, rest: %uz", rest);
411
394
395 c = obj;
396
397 nxt_debug(task, "controller conn body read");
398
399 b = c->read;
400
401 rest = nxt_buf_mem_free_size(&b->mem);
402
403 if (rest == 0) {
404 nxt_debug(task, "controller conn body read complete");
405
406 nxt_controller_process_request(task, c, data);
407 return;
408 }
409
410 nxt_debug(task, "controller conn body read again, rest: %uz", rest);
411
412 nxt_event_conn_read(task->thread->engine, c);
412 nxt_conn_read(task->thread->engine, c);
413}
414
415
416static const nxt_event_conn_state_t nxt_controller_conn_write_state
417 nxt_aligned(64) =
418{
419 .ready_handler = nxt_controller_conn_write,
420 .error_handler = nxt_controller_conn_write_error,
421
422 .timer_handler = nxt_controller_conn_write_timeout,
423 .timer_value = nxt_controller_conn_timeout_value,
424 .timer_data = 60 * 1000,
425 .timer_autoreset = 1,
426};
427
428
429static void
430nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
431{
413}
414
415
416static const nxt_event_conn_state_t nxt_controller_conn_write_state
417 nxt_aligned(64) =
418{
419 .ready_handler = nxt_controller_conn_write,
420 .error_handler = nxt_controller_conn_write_error,
421
422 .timer_handler = nxt_controller_conn_write_timeout,
423 .timer_value = nxt_controller_conn_timeout_value,
424 .timer_data = 60 * 1000,
425 .timer_autoreset = 1,
426};
427
428
429static void
430nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
431{
432 nxt_buf_t *b;
433 nxt_event_conn_t *c;
432 nxt_buf_t *b;
433 nxt_conn_t *c;
434
435 c = obj;
436
437 nxt_debug(task, "controller conn write");
438
439 b = c->write;
440
441 if (b->mem.pos != b->mem.free) {
434
435 c = obj;
436
437 nxt_debug(task, "controller conn write");
438
439 b = c->write;
440
441 if (b->mem.pos != b->mem.free) {
442 nxt_event_conn_write(task->thread->engine, c);
442 nxt_conn_write(task->thread->engine, c);
443 return;
444 }
445
446 nxt_debug(task, "controller conn write complete");
447
448 nxt_controller_conn_close(task, c, data);
449}
450
451
452static void
453nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
454{
443 return;
444 }
445
446 nxt_debug(task, "controller conn write complete");
447
448 nxt_controller_conn_close(task, c, data);
449}
450
451
452static void
453nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
454{
455 nxt_event_conn_t *c;
455 nxt_conn_t *c;
456
457 c = obj;
458
459 nxt_debug(task, "controller conn write error");
460
461 nxt_controller_conn_close(task, c, data);
462}
463
464
465static void
466nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
467{
456
457 c = obj;
458
459 nxt_debug(task, "controller conn write error");
460
461 nxt_controller_conn_close(task, c, data);
462}
463
464
465static void
466nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
467{
468 nxt_timer_t *ev;
469 nxt_event_conn_t *c;
468 nxt_conn_t *c;
469 nxt_timer_t *timer;
470
470
471 ev = obj;
471 timer = obj;
472
472
473 c = nxt_event_write_timer_conn(ev);
473 c = nxt_write_timer_conn(timer);
474 c->socket.timedout = 1;
475 c->socket.closed = 1;
476
477 nxt_debug(task, "controller conn write timeout");
478
479 nxt_controller_conn_close(task, c, data);
480}
481
482
483static const nxt_event_conn_state_t nxt_controller_conn_close_state
484 nxt_aligned(64) =
485{
486 .ready_handler = nxt_controller_conn_free,
487};
488
489
490static void
491nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
492{
474 c->socket.timedout = 1;
475 c->socket.closed = 1;
476
477 nxt_debug(task, "controller conn write timeout");
478
479 nxt_controller_conn_close(task, c, data);
480}
481
482
483static const nxt_event_conn_state_t nxt_controller_conn_close_state
484 nxt_aligned(64) =
485{
486 .ready_handler = nxt_controller_conn_free,
487};
488
489
490static void
491nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
492{
493 nxt_event_conn_t *c;
493 nxt_conn_t *c;
494
495 c = obj;
496
497 nxt_debug(task, "controller conn close");
498
499 nxt_queue_remove(&c->link);
500
501 c->write_state = &nxt_controller_conn_close_state;
502
494
495 c = obj;
496
497 nxt_debug(task, "controller conn close");
498
499 nxt_queue_remove(&c->link);
500
501 c->write_state = &nxt_controller_conn_close_state;
502
503 nxt_event_conn_close(task->thread->engine, c);
503 nxt_conn_close(task->thread->engine, c);
504}
505
506
507static void
508nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
509{
504}
505
506
507static void
508nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
509{
510 nxt_event_conn_t *c;
510 nxt_conn_t *c;
511
512 c = obj;
513
514 nxt_debug(task, "controller conn free");
515
516 nxt_mem_pool_destroy(c->mem_pool);
517
518 //nxt_free(c);

--- 20 unchanged lines hidden (view full) ---

539
540 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid");
541
542 return NXT_ERROR;
543}
544
545
546static void
511
512 c = obj;
513
514 nxt_debug(task, "controller conn free");
515
516 nxt_mem_pool_destroy(c->mem_pool);
517
518 //nxt_free(c);

--- 20 unchanged lines hidden (view full) ---

539
540 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid");
541
542 return NXT_ERROR;
543}
544
545
546static void
547nxt_controller_process_request(nxt_task_t *task, nxt_event_conn_t *c,
547nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c,
548 nxt_controller_request_t *req)
549{
550 nxt_int_t rc;
551 nxt_str_t path;
552 nxt_uint_t status;
553 nxt_buf_mem_t *mbuf;
554 nxt_mem_pool_t *mp;
555 nxt_conf_json_op_t *ops;

--- 176 unchanged lines hidden (view full) ---

732
733 if (nxt_controller_response(task, c, &resp) != NXT_OK) {
734 nxt_controller_conn_close(task, c, req);
735 }
736}
737
738
739static nxt_int_t
548 nxt_controller_request_t *req)
549{
550 nxt_int_t rc;
551 nxt_str_t path;
552 nxt_uint_t status;
553 nxt_buf_mem_t *mbuf;
554 nxt_mem_pool_t *mp;
555 nxt_conf_json_op_t *ops;

--- 176 unchanged lines hidden (view full) ---

732
733 if (nxt_controller_response(task, c, &resp) != NXT_OK) {
734 nxt_controller_conn_close(task, c, req);
735 }
736}
737
738
739static nxt_int_t
740nxt_controller_response(nxt_task_t *task, nxt_event_conn_t *c,
740nxt_controller_response(nxt_task_t *task, nxt_conn_t *c,
741 nxt_controller_response_t *resp)
742{
743 size_t size;
744 nxt_buf_t *b;
745
746 size = sizeof("HTTP/1.0 " "\r\n\r\n") - 1 + resp->status_line.length;
747
748 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);

--- 11 unchanged lines hidden (view full) ---

760
761 if (nxt_slow_path(b->next == NULL)) {
762 return NXT_ERROR;
763 }
764
765 c->write = b;
766 c->write_state = &nxt_controller_conn_write_state;
767
741 nxt_controller_response_t *resp)
742{
743 size_t size;
744 nxt_buf_t *b;
745
746 size = sizeof("HTTP/1.0 " "\r\n\r\n") - 1 + resp->status_line.length;
747
748 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);

--- 11 unchanged lines hidden (view full) ---

760
761 if (nxt_slow_path(b->next == NULL)) {
762 return NXT_ERROR;
763 }
764
765 c->write = b;
766 c->write_state = &nxt_controller_conn_write_state;
767
768 nxt_event_conn_write(task->thread->engine, c);
768 nxt_conn_write(task->thread->engine, c);
769
770 return NXT_OK;
771}
772
773
774static nxt_buf_t *
775nxt_controller_response_body(nxt_controller_response_t *resp,
776 nxt_mem_pool_t *pool)

--- 37 unchanged lines hidden ---
769
770 return NXT_OK;
771}
772
773
774static nxt_buf_t *
775nxt_controller_response_body(nxt_controller_response_t *resp,
776 nxt_mem_pool_t *pool)

--- 37 unchanged lines hidden ---