nxt_application.c (56:92b4984ca3c1) nxt_application.c (62:5e1efcc7b740)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_application.h>
11
12
13#define NXT_PARSE_AGAIN (u_char *) -1
14
15
16static nxt_int_t nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt);
17static void nxt_app_thread(void *ctx);
18static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s,
19 nxt_log_t *log);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_application.h>
11
12
13#define NXT_PARSE_AGAIN (u_char *) -1
14
15
16static nxt_int_t nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt);
17static void nxt_app_thread(void *ctx);
18static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s,
19 nxt_log_t *log);
20static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c,
20static void nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c,
21 nxt_log_t *log);
22static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r);
21 nxt_log_t *log);
22static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r);
23static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out);
23static void nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out);
24static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data);
25static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data);
26static void nxt_app_delivery_completion(nxt_task_t *task, void *obj,
27 void *data);
28static void nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data);
29static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data);
24static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data);
25static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data);
26static void nxt_app_delivery_completion(nxt_task_t *task, void *obj,
27 void *data);
28static void nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data);
29static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data);
30static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c,
30static nxt_msec_t nxt_app_delivery_timer_value(nxt_conn_t *c,
31 uintptr_t data);
31 uintptr_t data);
32static void nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c);
32static void nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c);
33static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data);
34
35
36typedef struct nxt_app_http_parse_state_s nxt_app_http_parse_state_t;
37
38struct nxt_app_http_parse_state_s {
39 u_char *pos;
40 nxt_int_t (*handler)(nxt_app_request_header_t *h, u_char *start,

--- 210 unchanged lines hidden (view full) ---

251 nxt_nanosleep(1000000000); /* 1s */
252 }
253}
254
255
256static nxt_app_request_t *
257nxt_app_request_create(nxt_socket_t s, nxt_log_t *log)
258{
33static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data);
34
35
36typedef struct nxt_app_http_parse_state_s nxt_app_http_parse_state_t;
37
38struct nxt_app_http_parse_state_s {
39 u_char *pos;
40 nxt_int_t (*handler)(nxt_app_request_header_t *h, u_char *start,

--- 210 unchanged lines hidden (view full) ---

251 nxt_nanosleep(1000000000); /* 1s */
252 }
253}
254
255
256static nxt_app_request_t *
257nxt_app_request_create(nxt_socket_t s, nxt_log_t *log)
258{
259 nxt_conn_t *c;
259 nxt_mem_pool_t *mp;
260 nxt_mem_pool_t *mp;
260 nxt_event_conn_t *c;
261 nxt_app_request_t *r;
262
263 mp = nxt_mem_pool_create(1024);
264 if (nxt_slow_path(mp == NULL)) {
265 return NULL;
266 }
267
268 r = nxt_mem_zalloc(mp, sizeof(nxt_app_request_t));
269 if (nxt_slow_path(r == NULL)) {
270 return NULL;
271 }
272
261 nxt_app_request_t *r;
262
263 mp = nxt_mem_pool_create(1024);
264 if (nxt_slow_path(mp == NULL)) {
265 return NULL;
266 }
267
268 r = nxt_mem_zalloc(mp, sizeof(nxt_app_request_t));
269 if (nxt_slow_path(r == NULL)) {
270 return NULL;
271 }
272
273 c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t));
273 c = nxt_mem_zalloc(mp, sizeof(nxt_conn_t));
274 if (nxt_slow_path(c == NULL)) {
275 return NULL;
276 }
277
278 c->socket.fd = s;
279 c->socket.data = r;
280
281 c->task.thread = nxt_thread();

--- 247 unchanged lines hidden (view full) ---

529 }
530 }
531
532 return NXT_OK;
533}
534
535
536static void
274 if (nxt_slow_path(c == NULL)) {
275 return NULL;
276 }
277
278 c->socket.fd = s;
279 c->socket.data = r;
280
281 c->task.thread = nxt_thread();

--- 247 unchanged lines hidden (view full) ---

529 }
530 }
531
532 return NXT_OK;
533}
534
535
536static void
537nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log)
537nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c, nxt_log_t *log)
538{
539 c->socket.write_ready = 1;
540
541 c->socket.log = &c->log;
542 c->log = *log;
543
544 /* The while loop skips possible uint32_t overflow. */
545

--- 11 unchanged lines hidden (view full) ---

557 c->max_chunk = NXT_INT32_T_MAX;
558 c->sendfile = NXT_CONN_SENDFILE_UNSET;
559
560 c->socket.read_work_queue = &thr->engine->read_work_queue;
561 c->socket.write_work_queue = &thr->engine->write_work_queue;
562 c->read_work_queue = &thr->engine->read_work_queue;
563 c->write_work_queue = &thr->engine->write_work_queue;
564
538{
539 c->socket.write_ready = 1;
540
541 c->socket.log = &c->log;
542 c->log = *log;
543
544 /* The while loop skips possible uint32_t overflow. */
545

--- 11 unchanged lines hidden (view full) ---

557 c->max_chunk = NXT_INT32_T_MAX;
558 c->sendfile = NXT_CONN_SENDFILE_UNSET;
559
560 c->socket.read_work_queue = &thr->engine->read_work_queue;
561 c->socket.write_work_queue = &thr->engine->write_work_queue;
562 c->read_work_queue = &thr->engine->read_work_queue;
563 c->write_work_queue = &thr->engine->write_work_queue;
564
565 nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
566 nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);
565 nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
566 nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);
567
568 nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections);
569}
570
571
572nxt_int_t
573nxt_app_http_read_body(nxt_app_request_t *r, u_char *start, size_t length)
574{

--- 190 unchanged lines hidden (view full) ---

765
766 nxt_app_buf_send(r->event_conn, out);
767
768 return NXT_OK;
769}
770
771
772static void
567
568 nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections);
569}
570
571
572nxt_int_t
573nxt_app_http_read_body(nxt_app_request_t *r, u_char *start, size_t length)
574{

--- 190 unchanged lines hidden (view full) ---

765
766 nxt_app_buf_send(r->event_conn, out);
767
768 return NXT_OK;
769}
770
771
772static void
773nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out)
773nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out)
774{
775 nxt_app_buf_t *ab;
776
777 ab = nxt_container_of(out, nxt_app_buf_t, buf);
778
779 nxt_work_set(&ab->work, nxt_app_delivery_handler, &c->task, c, out);
780
781 nxt_event_engine_post(nxt_app_engine, &ab->work);
782}
783
784
785static void
786nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
787{
774{
775 nxt_app_buf_t *ab;
776
777 ab = nxt_container_of(out, nxt_app_buf_t, buf);
778
779 nxt_work_set(&ab->work, nxt_app_delivery_handler, &c->task, c, out);
780
781 nxt_event_engine_post(nxt_app_engine, &ab->work);
782}
783
784
785static void
786nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
787{
788 nxt_buf_t *b;
789 nxt_mem_pool_t *mp;
790 nxt_event_conn_t *c;
788 nxt_buf_t *b;
789 nxt_conn_t *c;
790 nxt_mem_pool_t *mp;
791
792 c = obj;
793 b = data;
794
795 nxt_debug(task, "app delivery handler");
796
797 if (c->write != NULL) {
798 nxt_buf_chain_add(&c->write, b);

--- 16 unchanged lines hidden (view full) ---

815 nxt_work_queue_add(c->write_work_queue, nxt_app_delivery_completion,
816 task, c, NULL);
817 return;
818 }
819
820 c->write = b;
821 c->write_state = &nxt_app_delivery_write_state;
822
791
792 c = obj;
793 b = data;
794
795 nxt_debug(task, "app delivery handler");
796
797 if (c->write != NULL) {
798 nxt_buf_chain_add(&c->write, b);

--- 16 unchanged lines hidden (view full) ---

815 nxt_work_queue_add(c->write_work_queue, nxt_app_delivery_completion,
816 task, c, NULL);
817 return;
818 }
819
820 c->write = b;
821 c->write_state = &nxt_app_delivery_write_state;
822
823 nxt_event_conn_write(task->thread->engine, c);
823 nxt_conn_write(task->thread->engine, c);
824}
825
826
827static const nxt_event_conn_state_t nxt_app_delivery_write_state
828 nxt_aligned(64) =
829{
830 .ready_handler = nxt_app_delivery_ready,
831 .error_handler = nxt_app_delivery_error,
832
833 .timer_handler = nxt_app_delivery_timeout,
834 .timer_value = nxt_app_delivery_timer_value,
835 .timer_data = 0,
836 .timer_autoreset = 1,
837};
838
839
840static void
841nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data)
842{
824}
825
826
827static const nxt_event_conn_state_t nxt_app_delivery_write_state
828 nxt_aligned(64) =
829{
830 .ready_handler = nxt_app_delivery_ready,
831 .error_handler = nxt_app_delivery_error,
832
833 .timer_handler = nxt_app_delivery_timeout,
834 .timer_value = nxt_app_delivery_timer_value,
835 .timer_data = 0,
836 .timer_autoreset = 1,
837};
838
839
840static void
841nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data)
842{
843 nxt_buf_t *b, *next;
844 nxt_event_conn_t *c;
843 nxt_buf_t *b, *next;
844 nxt_conn_t *c;
845
846 c = obj;
847
848 nxt_debug(task, "app delivery ready");
849
850 for (b = c->write; b != NULL; b = next) {
851
852 if (nxt_buf_is_mem(b)) {

--- 17 unchanged lines hidden (view full) ---

870{
871 .ready_handler = nxt_app_close_request,
872};
873
874
875static void
876nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
877{
845
846 c = obj;
847
848 nxt_debug(task, "app delivery ready");
849
850 for (b = c->write; b != NULL; b = next) {
851
852 if (nxt_buf_is_mem(b)) {

--- 17 unchanged lines hidden (view full) ---

870{
871 .ready_handler = nxt_app_close_request,
872};
873
874
875static void
876nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
877{
878 nxt_buf_t *b, *bn, *free;
879 nxt_event_conn_t *c;
878 nxt_buf_t *b, *bn, *free;
879 nxt_conn_t *c;
880 nxt_app_request_t *r;
881
882 nxt_debug(task, "app delivery completion");
883
884 free = NULL;
885
886 for (b = nxt_app_buf_done; b; b = bn) {
887 bn = b->next;

--- 9 unchanged lines hidden (view full) ---

897 }
898
899 if (nxt_buf_is_last(b)) {
900 r = (nxt_app_request_t *) b->parent;
901
902 c = r->event_conn;
903 c->write_state = &nxt_app_delivery_close_state;
904
880 nxt_app_request_t *r;
881
882 nxt_debug(task, "app delivery completion");
883
884 free = NULL;
885
886 for (b = nxt_app_buf_done; b; b = bn) {
887 bn = b->next;

--- 9 unchanged lines hidden (view full) ---

897 }
898
899 if (nxt_buf_is_last(b)) {
900 r = (nxt_app_request_t *) b->parent;
901
902 c = r->event_conn;
903 c->write_state = &nxt_app_delivery_close_state;
904
905 nxt_event_conn_close(task->thread->engine, c);
905 nxt_conn_close(task->thread->engine, c);
906 }
907 }
908
909 nxt_app_buf_done = NULL;
910
911 if (free == NULL) {
912 return;
913 }

--- 10 unchanged lines hidden (view full) ---

924
925 (void) nxt_thread_cond_signal(&nxt_app_cond);
926}
927
928
929static void
930nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data)
931{
906 }
907 }
908
909 nxt_app_buf_done = NULL;
910
911 if (free == NULL) {
912 return;
913 }

--- 10 unchanged lines hidden (view full) ---

924
925 (void) nxt_thread_cond_signal(&nxt_app_cond);
926}
927
928
929static void
930nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data)
931{
932 nxt_event_conn_t *c;
932 nxt_conn_t *c;
933
934 c = obj;
935
936 nxt_debug(task, "app delivery error");
937
938 nxt_app_delivery_done(task, c);
939}
940
941
942static void
943nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data)
944{
933
934 c = obj;
935
936 nxt_debug(task, "app delivery error");
937
938 nxt_app_delivery_done(task, c);
939}
940
941
942static void
943nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data)
944{
945 nxt_event_conn_t *c;
945 nxt_conn_t *c;
946
947 c = obj;
948
949 nxt_debug(task, "app delivery timeout");
950
951 nxt_app_delivery_done(task, c);
952}
953
954
955static nxt_msec_t
946
947 c = obj;
948
949 nxt_debug(task, "app delivery timeout");
950
951 nxt_app_delivery_done(task, c);
952}
953
954
955static nxt_msec_t
956nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data)
956nxt_app_delivery_timer_value(nxt_conn_t *c, uintptr_t data)
957{
958 /* 30000 ms */
959 return 30000;
960}
961
962
963static void
957{
958 /* 30000 ms */
959 return 30000;
960}
961
962
963static void
964nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c)
964nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c)
965{
966 if (c->write == NULL) {
967 return;
968 }
969
970 nxt_debug(task, "app delivery done");
971
972 nxt_buf_chain_add(&nxt_app_buf_done, c->write);
973
974 c->write = NULL;
975
976 nxt_work_queue_add(c->write_work_queue,
977 nxt_app_delivery_completion, task, c, NULL);
978}
979
980
981static void
982nxt_app_close_request(nxt_task_t *task, void *obj, void *data)
983{
965{
966 if (c->write == NULL) {
967 return;
968 }
969
970 nxt_debug(task, "app delivery done");
971
972 nxt_buf_chain_add(&nxt_app_buf_done, c->write);
973
974 c->write = NULL;
975
976 nxt_work_queue_add(c->write_work_queue,
977 nxt_app_delivery_completion, task, c, NULL);
978}
979
980
981static void
982nxt_app_close_request(nxt_task_t *task, void *obj, void *data)
983{
984 nxt_event_conn_t *c;
984 nxt_conn_t *c;
985 nxt_app_request_t *r;
986
987 c = obj;
988
989 nxt_debug(task, "app close connection");
990
991 r = c->socket.data;
992
993 nxt_mem_pool_destroy(c->mem_pool);
994 nxt_mem_pool_destroy(r->mem_pool);
995}
985 nxt_app_request_t *r;
986
987 c = obj;
988
989 nxt_debug(task, "app close connection");
990
991 r = c->socket.data;
992
993 nxt_mem_pool_destroy(c->mem_pool);
994 nxt_mem_pool_destroy(r->mem_pool);
995}