1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 120 unchanged lines hidden (view full) ---

129static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
130 void *data);
131static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
132 void *data);
133static void nxt_router_process_http_request(nxt_task_t *task,
134 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
135static void nxt_router_process_http_request_mp(nxt_task_t *task,
136 nxt_req_app_link_t *ra, nxt_port_t *port);
137static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
138 nxt_app_wmsg_t *wmsg);
139static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
140 nxt_app_wmsg_t *wmsg);
141static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
142 nxt_app_wmsg_t *wmsg);
143static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
144static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
145static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
146static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
147static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
148static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
149
150static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
151 const char* fmt, ...);
152
153static nxt_router_t *nxt_router;
154
155
156static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = {
157 nxt_python_prepare_msg,
158 nxt_php_prepare_msg,
159 nxt_go_prepare_msg,
160};
161
162
163nxt_int_t
164nxt_router_start(nxt_task_t *task, void *data)
165{
166 nxt_int_t ret;
167 nxt_router_t *router;
168 nxt_runtime_t *rt;
169
170 rt = task->thread->runtime;

--- 492 unchanged lines hidden (view full) ---

663 nxt_str_t name;
664 nxt_app_t *app, *prev;
665 nxt_app_type_t type;
666 nxt_sockaddr_t *sa;
667 nxt_conf_value_t *conf, *http;
668 nxt_conf_value_t *applications, *application;
669 nxt_conf_value_t *listeners, *listener;
670 nxt_socket_conf_t *skcf;
671 nxt_app_lang_module_t *lang;
672 nxt_router_app_conf_t apcf;
673 nxt_router_listener_conf_t lscf;
674
675 static nxt_str_t http_path = nxt_string("/http");
676 static nxt_str_t applications_path = nxt_string("/applications");
677 static nxt_str_t listeners_path = nxt_string("/listeners");
678
679 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);

--- 65 unchanged lines hidden (view full) ---

745 if (ret != NXT_OK) {
746 nxt_log(task, NXT_LOG_CRIT, "application map error");
747 goto app_fail;
748 }
749
750 nxt_debug(task, "application type: %V", &apcf.type);
751 nxt_debug(task, "application workers: %D", apcf.workers);
752
738 type = nxt_app_parse_type(&apcf.type);
753 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
754
740 if (type == NXT_APP_UNKNOWN) {
755 if (lang == NULL) {
756 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
757 &apcf.type);
758 goto app_fail;
759 }
760
746 if (nxt_app_modules[type] == NULL) {
761 nxt_debug(task, "application language module: \"%s\"", lang->file);
762
763 type = nxt_app_parse_type(&lang->type);
764
765 if (type == NXT_APP_UNKNOWN) {
766 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
767 &lang->type);
768 goto app_fail;
769 }
770
771 if (nxt_app_prepare_msg[type] == NULL) {
772 nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"",
748 &apcf.type);
773 &lang->type);
774 goto app_fail;
775 }
776
777 ret = nxt_thread_mutex_create(&app->mutex);
778 if (ret != NXT_OK) {
779 goto app_fail;
780 }
781
782 nxt_queue_init(&app->ports);
783 nxt_queue_init(&app->requests);
784
785 app->name.length = name.length;
786 nxt_memcpy(app->name.start, name.start, name.length);
787
788 app->type = type;
789 app->max_workers = apcf.workers;
790 app->live = 1;
766 app->module = nxt_app_modules[type];
791 app->prepare_msg = nxt_app_prepare_msg[type];
792
793 nxt_queue_insert_tail(&tmcf->apps, &app->link);
794 }
795
796 http = nxt_conf_get_path(conf, &http_path);
797#if 0
798 if (http == NULL) {
799 nxt_log(task, NXT_LOG_CRIT, "no \"http\" block");

--- 1907 unchanged lines hidden (view full) ---

2707 nxt_process_connected_port_add(port->process, reply_port);
2708 }
2709
2710 wmsg.port = port;
2711 wmsg.write = NULL;
2712 wmsg.buf = &wmsg.write;
2713 wmsg.stream = ra->req_id;
2714
2690 res = port->app->module->prepare_msg(task, &ap->r, &wmsg);
2715 res = port->app->prepare_msg(task, &ap->r, &wmsg);
2716
2717 if (nxt_slow_path(res != NXT_OK)) {
2718 nxt_router_gen_error(task, c, 500,
2719 "Failed to prepare message for application");
2720 return;
2721 }
2722
2723 nxt_debug(task, "about to send %d bytes buffer to worker port %d",

--- 6 unchanged lines hidden (view full) ---

2730 if (nxt_slow_path(res != NXT_OK)) {
2731 nxt_router_gen_error(task, c, 500,
2732 "Failed to send message to application");
2733 return;
2734 }
2735}
2736
2737
2738static nxt_int_t
2739nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
2740 nxt_app_wmsg_t *wmsg)
2741{
2742 nxt_int_t rc;
2743 nxt_buf_t *b;
2744 nxt_http_field_t *field;
2745 nxt_app_request_header_t *h;
2746
2747 static const nxt_str_t prefix = nxt_string("HTTP_");
2748 static const nxt_str_t eof = nxt_null_string;
2749
2750 h = &r->header;
2751
2752#define RC(S) \
2753 do { \
2754 rc = (S); \
2755 if (nxt_slow_path(rc != NXT_OK)) { \
2756 goto fail; \
2757 } \
2758 } while(0)
2759
2760#define NXT_WRITE(N) \
2761 RC(nxt_app_msg_write_str(task, wmsg, N))
2762
2763 /* TODO error handle, async mmap buffer assignment */
2764
2765 NXT_WRITE(&h->method);
2766 NXT_WRITE(&h->target);
2767 if (h->path.start == h->target.start) {
2768 NXT_WRITE(&eof);
2769 } else {
2770 NXT_WRITE(&h->path);
2771 }
2772
2773 if (h->query.start != NULL) {
2774 RC(nxt_app_msg_write_size(task, wmsg,
2775 h->query.start - h->target.start + 1));
2776 } else {
2777 RC(nxt_app_msg_write_size(task, wmsg, 0));
2778 }
2779
2780 NXT_WRITE(&h->version);
2781
2782 NXT_WRITE(&r->remote);
2783
2784 NXT_WRITE(&h->host);
2785 NXT_WRITE(&h->content_type);
2786 NXT_WRITE(&h->content_length);
2787
2788 nxt_list_each(field, h->fields) {
2789 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
2790 &prefix, &field->name));
2791 NXT_WRITE(&field->value);
2792
2793 } nxt_list_loop;
2794
2795 /* end-of-headers mark */
2796 NXT_WRITE(&eof);
2797
2798 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
2799
2800 for(b = r->body.buf; b != NULL; b = b->next) {
2801 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
2802 nxt_buf_mem_used_size(&b->mem)));
2803 }
2804
2805#undef NXT_WRITE
2806#undef RC
2807
2808 return NXT_OK;
2809
2810fail:
2811
2812 return NXT_ERROR;
2813}
2814
2815
2816static nxt_int_t
2817nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
2818 nxt_app_wmsg_t *wmsg)
2819{
2820 nxt_int_t rc;
2821 nxt_buf_t *b;
2822 nxt_http_field_t *field;
2823 nxt_app_request_header_t *h;
2824
2825 static const nxt_str_t prefix = nxt_string("HTTP_");
2826 static const nxt_str_t eof = nxt_null_string;
2827
2828 h = &r->header;
2829
2830#define RC(S) \
2831 do { \
2832 rc = (S); \
2833 if (nxt_slow_path(rc != NXT_OK)) { \
2834 goto fail; \
2835 } \
2836 } while(0)
2837
2838#define NXT_WRITE(N) \
2839 RC(nxt_app_msg_write_str(task, wmsg, N))
2840
2841 /* TODO error handle, async mmap buffer assignment */
2842
2843 NXT_WRITE(&h->method);
2844 NXT_WRITE(&h->target);
2845 if (h->path.start == h->target.start) {
2846 NXT_WRITE(&eof);
2847 } else {
2848 NXT_WRITE(&h->path);
2849 }
2850
2851 if (h->query.start != NULL) {
2852 RC(nxt_app_msg_write_size(task, wmsg,
2853 h->query.start - h->target.start + 1));
2854 } else {
2855 RC(nxt_app_msg_write_size(task, wmsg, 0));
2856 }
2857
2858 NXT_WRITE(&h->version);
2859
2860 // PHP_SELF
2861 // SCRIPT_NAME
2862 // SCRIPT_FILENAME
2863 // DOCUMENT_ROOT
2864
2865 NXT_WRITE(&r->remote);
2866
2867 NXT_WRITE(&h->host);
2868 NXT_WRITE(&h->cookie);
2869 NXT_WRITE(&h->content_type);
2870 NXT_WRITE(&h->content_length);
2871
2872 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
2873
2874 nxt_list_each(field, h->fields) {
2875 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
2876 &prefix, &field->name));
2877 NXT_WRITE(&field->value);
2878
2879 } nxt_list_loop;
2880
2881 /* end-of-headers mark */
2882 NXT_WRITE(&eof);
2883
2884 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
2885
2886 for(b = r->body.buf; b != NULL; b = b->next) {
2887 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
2888 nxt_buf_mem_used_size(&b->mem)));
2889 }
2890
2891#undef NXT_WRITE
2892#undef RC
2893
2894 return NXT_OK;
2895
2896fail:
2897
2898 return NXT_ERROR;
2899}
2900
2901
2902static nxt_int_t
2903nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
2904{
2905 nxt_int_t rc;
2906 nxt_buf_t *b;
2907 nxt_http_field_t *field;
2908 nxt_app_request_header_t *h;
2909
2910 static const nxt_str_t eof = nxt_null_string;
2911
2912 h = &r->header;
2913
2914#define RC(S) \
2915 do { \
2916 rc = (S); \
2917 if (nxt_slow_path(rc != NXT_OK)) { \
2918 goto fail; \
2919 } \
2920 } while(0)
2921
2922#define NXT_WRITE(N) \
2923 RC(nxt_app_msg_write_str(task, wmsg, N))
2924
2925 /* TODO error handle, async mmap buffer assignment */
2926
2927 NXT_WRITE(&h->method);
2928 NXT_WRITE(&h->target);
2929 if (h->path.start == h->target.start) {
2930 NXT_WRITE(&eof);
2931 } else {
2932 NXT_WRITE(&h->path);
2933 }
2934
2935 if (h->query.start != NULL) {
2936 RC(nxt_app_msg_write_size(task, wmsg,
2937 h->query.start - h->target.start + 1));
2938 } else {
2939 RC(nxt_app_msg_write_size(task, wmsg, 0));
2940 }
2941
2942 NXT_WRITE(&h->version);
2943
2944 NXT_WRITE(&h->host);
2945 NXT_WRITE(&h->cookie);
2946 NXT_WRITE(&h->content_type);
2947 NXT_WRITE(&h->content_length);
2948
2949 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
2950
2951 nxt_list_each(field, h->fields) {
2952 NXT_WRITE(&field->name);
2953 NXT_WRITE(&field->value);
2954
2955 } nxt_list_loop;
2956
2957 /* end-of-headers mark */
2958 NXT_WRITE(&eof);
2959
2960 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
2961
2962 for(b = r->body.buf; b != NULL; b = b->next) {
2963 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
2964 nxt_buf_mem_used_size(&b->mem)));
2965 }
2966
2967#undef NXT_WRITE
2968#undef RC
2969
2970 return NXT_OK;
2971
2972fail:
2973
2974 return NXT_ERROR;
2975}
2976
2977
2978static const nxt_conn_state_t nxt_router_conn_close_state
2979 nxt_aligned(64) =
2980{
2981 .ready_handler = nxt_router_conn_free,
2982};
2983
2984
2985static void

--- 162 unchanged lines hidden ---