xref: /unit/src/ruby/nxt_ruby.c (revision 1814:05a8e3eb6244)
1 /*
2  * Copyright (C) Alexander Borisov
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <ruby/nxt_ruby.h>
7 
8 #include <nxt_unit.h>
9 #include <nxt_unit_request.h>
10 
11 #include <ruby/thread.h>
12 
13 #include NXT_RUBY_MOUNTS_H
14 
15 #include <locale.h>
16 
17 
18 #define NXT_RUBY_RACK_API_VERSION_MAJOR  1
19 #define NXT_RUBY_RACK_API_VERSION_MINOR  3
20 
21 
22 typedef struct {
23     nxt_task_t      *task;
24     nxt_str_t       *script;
25     nxt_ruby_ctx_t  *rctx;
26 } nxt_ruby_rack_init_t;
27 
28 
29 static nxt_int_t nxt_ruby_start(nxt_task_t *task,
30     nxt_process_data_t *data);
31 static VALUE nxt_ruby_init_basic(VALUE arg);
32 static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init);
33 
34 static VALUE nxt_ruby_require_rubygems(VALUE arg);
35 static VALUE nxt_ruby_bundler_setup(VALUE arg);
36 static VALUE nxt_ruby_require_rack(VALUE arg);
37 static VALUE nxt_ruby_rack_parse_script(VALUE ctx);
38 static VALUE nxt_ruby_rack_env_create(VALUE arg);
39 static int nxt_ruby_init_io(nxt_ruby_ctx_t *rctx);
40 static void nxt_ruby_request_handler(nxt_unit_request_info_t *req);
41 static void *nxt_ruby_request_handler_gvl(void *req);
42 static int nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx);
43 static void *nxt_ruby_thread_create_gvl(void *rctx);
44 static VALUE nxt_ruby_thread_func(VALUE arg);
45 static void *nxt_ruby_unit_run(void *ctx);
46 static void nxt_ruby_ubf(void *ctx);
47 static int nxt_ruby_init_threads(nxt_ruby_app_conf_t *c);
48 static void nxt_ruby_join_threads(nxt_unit_ctx_t *ctx,
49     nxt_ruby_app_conf_t *c);
50 
51 static VALUE nxt_ruby_rack_app_run(VALUE arg);
52 static int nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env);
53 nxt_inline void nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
54     nxt_unit_sptr_t *sptr, uint32_t len);
55 static nxt_int_t nxt_ruby_rack_result_status(nxt_unit_request_info_t *req,
56     VALUE result);
57 static int nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req,
58     VALUE result, nxt_int_t status);
59 static int nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg);
60 static int nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg);
61 static int nxt_ruby_rack_result_body(nxt_unit_request_info_t *req,
62     VALUE result);
63 static int nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
64     VALUE filepath);
65 static void *nxt_ruby_response_write_cb(void *read_info);
66 static VALUE nxt_ruby_rack_result_body_each(VALUE body, VALUE arg,
67     int argc, const VALUE *argv, VALUE blockarg);
68 static void *nxt_ruby_response_write(void *body);
69 
70 static void nxt_ruby_exception_log(nxt_unit_request_info_t *req,
71     uint32_t level, const char *desc);
72 
73 static void nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx);
74 static void nxt_ruby_atexit(void);
75 
76 
77 static uint32_t  compat[] = {
78     NXT_VERNUM, NXT_DEBUG,
79 };
80 
81 static VALUE  nxt_ruby_rackup;
82 static VALUE  nxt_ruby_call;
83 
84 static uint32_t        nxt_ruby_threads;
85 static nxt_ruby_ctx_t  *nxt_ruby_ctxs;
86 
87 NXT_EXPORT nxt_app_module_t  nxt_app_module = {
88     sizeof(compat),
89     compat,
90     nxt_string("ruby"),
91     ruby_version,
92     nxt_ruby_mounts,
93     nxt_nitems(nxt_ruby_mounts),
94     NULL,
95     nxt_ruby_start,
96 };
97 
98 typedef struct {
99     nxt_str_t  string;
100     VALUE      *v;
101 } nxt_ruby_string_t;
102 
103 static VALUE  nxt_rb_80_str;
104 static VALUE  nxt_rb_content_length_str;
105 static VALUE  nxt_rb_content_type_str;
106 static VALUE  nxt_rb_http_str;
107 static VALUE  nxt_rb_https_str;
108 static VALUE  nxt_rb_path_info_str;
109 static VALUE  nxt_rb_query_string_str;
110 static VALUE  nxt_rb_rack_url_scheme_str;
111 static VALUE  nxt_rb_remote_addr_str;
112 static VALUE  nxt_rb_request_method_str;
113 static VALUE  nxt_rb_request_uri_str;
114 static VALUE  nxt_rb_server_addr_str;
115 static VALUE  nxt_rb_server_name_str;
116 static VALUE  nxt_rb_server_port_str;
117 static VALUE  nxt_rb_server_protocol_str;
118 
119 static nxt_ruby_string_t nxt_rb_strings[] = {
120     { nxt_string("80"), &nxt_rb_80_str },
121     { nxt_string("CONTENT_LENGTH"), &nxt_rb_content_length_str },
122     { nxt_string("CONTENT_TYPE"), &nxt_rb_content_type_str },
123     { nxt_string("http"), &nxt_rb_http_str },
124     { nxt_string("https"), &nxt_rb_https_str },
125     { nxt_string("PATH_INFO"), &nxt_rb_path_info_str },
126     { nxt_string("QUERY_STRING"), &nxt_rb_query_string_str },
127     { nxt_string("rack.url_scheme"), &nxt_rb_rack_url_scheme_str },
128     { nxt_string("REMOTE_ADDR"), &nxt_rb_remote_addr_str },
129     { nxt_string("REQUEST_METHOD"), &nxt_rb_request_method_str },
130     { nxt_string("REQUEST_URI"), &nxt_rb_request_uri_str },
131     { nxt_string("SERVER_ADDR"), &nxt_rb_server_addr_str },
132     { nxt_string("SERVER_NAME"), &nxt_rb_server_name_str },
133     { nxt_string("SERVER_PORT"), &nxt_rb_server_port_str },
134     { nxt_string("SERVER_PROTOCOL"), &nxt_rb_server_protocol_str },
135     { nxt_null_string, NULL },
136 };
137 
138 
139 static int
140 nxt_ruby_init_strings(void)
141 {
142     VALUE              v;
143     nxt_ruby_string_t  *pstr;
144 
145     pstr = nxt_rb_strings;
146 
147     while (pstr->string.start != NULL) {
148         v = rb_str_new_static((char *) pstr->string.start, pstr->string.length);
149 
150         if (nxt_slow_path(v == Qnil)) {
151             nxt_unit_alert(NULL, "Ruby: failed to create const string '%.*s'",
152                            (int) pstr->string.length,
153                            (char *) pstr->string.start);
154 
155             return NXT_UNIT_ERROR;
156         }
157 
158         *pstr->v = v;
159 
160         rb_gc_register_address(pstr->v);
161 
162         pstr++;
163     }
164 
165     return NXT_UNIT_OK;
166 }
167 
168 
169 static void
170 nxt_ruby_done_strings(void)
171 {
172     nxt_ruby_string_t  *pstr;
173 
174     pstr = nxt_rb_strings;
175 
176     while (pstr->string.start != NULL) {
177         rb_gc_unregister_address(pstr->v);
178 
179         *pstr->v = Qnil;
180 
181         pstr++;
182     }
183 }
184 
185 
186 static nxt_int_t
187 nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data)
188 {
189     int                    state, rc;
190     VALUE                  res;
191     nxt_ruby_ctx_t         ruby_ctx;
192     nxt_unit_ctx_t         *unit_ctx;
193     nxt_unit_init_t        ruby_unit_init;
194     nxt_ruby_app_conf_t    *c;
195     nxt_ruby_rack_init_t   rack_init;
196     nxt_common_app_conf_t  *conf;
197 
198     static char  *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" };
199 
200     conf = data->app;
201     c = &conf->u.ruby;
202 
203     nxt_ruby_threads = c->threads;
204 
205     setlocale(LC_CTYPE, "");
206 
207     RUBY_INIT_STACK
208     ruby_init();
209     ruby_options(2, argv);
210     ruby_script("NGINX_Unit");
211 
212     ruby_ctx.env = Qnil;
213     ruby_ctx.io_input = Qnil;
214     ruby_ctx.io_error = Qnil;
215     ruby_ctx.thread = Qnil;
216     ruby_ctx.ctx = NULL;
217     ruby_ctx.req = NULL;
218 
219     rack_init.task = task;
220     rack_init.script = &c->script;
221     rack_init.rctx = &ruby_ctx;
222 
223     nxt_ruby_init_strings();
224 
225     res = rb_protect(nxt_ruby_init_basic,
226                      (VALUE) (uintptr_t) &rack_init, &state);
227     if (nxt_slow_path(res == Qnil || state != 0)) {
228         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
229                                "Failed to init basic variables");
230         return NXT_ERROR;
231     }
232 
233     nxt_ruby_call = Qnil;
234 
235     nxt_ruby_rackup = nxt_ruby_rack_init(&rack_init);
236     if (nxt_slow_path(nxt_ruby_rackup == Qnil)) {
237         return NXT_ERROR;
238     }
239 
240     rb_gc_register_address(&nxt_ruby_rackup);
241 
242     nxt_ruby_call = rb_intern("call");
243     if (nxt_slow_path(nxt_ruby_call == Qnil)) {
244         nxt_alert(task, "Ruby: Unable to find rack entry point");
245 
246         goto fail;
247     }
248 
249     rb_gc_register_address(&nxt_ruby_call);
250 
251     ruby_ctx.env = rb_protect(nxt_ruby_rack_env_create,
252                               (VALUE) (uintptr_t) &ruby_ctx, &state);
253     if (nxt_slow_path(ruby_ctx.env == Qnil || state != 0)) {
254         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
255                                "Failed to create 'environ' variable");
256         goto fail;
257     }
258 
259     rc = nxt_ruby_init_threads(c);
260     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
261         goto fail;
262     }
263 
264     nxt_unit_default_init(task, &ruby_unit_init);
265 
266     ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler;
267     ruby_unit_init.callbacks.ready_handler = nxt_ruby_ready_handler;
268     ruby_unit_init.shm_limit = conf->shm_limit;
269     ruby_unit_init.data = c;
270     ruby_unit_init.ctx_data = &ruby_ctx;
271 
272     unit_ctx = nxt_unit_init(&ruby_unit_init);
273     if (nxt_slow_path(unit_ctx == NULL)) {
274         goto fail;
275     }
276 
277     rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_unit_run, unit_ctx,
278                                                nxt_ruby_ubf, unit_ctx);
279 
280     nxt_ruby_join_threads(unit_ctx, c);
281 
282     nxt_unit_done(unit_ctx);
283 
284     nxt_ruby_ctx_done(&ruby_ctx);
285 
286     nxt_ruby_atexit();
287 
288     exit(rc);
289 
290     return NXT_OK;
291 
292 fail:
293 
294     nxt_ruby_join_threads(NULL, c);
295 
296     nxt_ruby_ctx_done(&ruby_ctx);
297 
298     nxt_ruby_atexit();
299 
300     return NXT_ERROR;
301 }
302 
303 
304 static VALUE
305 nxt_ruby_init_basic(VALUE arg)
306 {
307     int                   state;
308     nxt_ruby_rack_init_t  *rack_init;
309 
310     rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) arg;
311 
312     state = rb_enc_find_index("encdb");
313     if (nxt_slow_path(state == 0)) {
314         nxt_alert(rack_init->task,
315                   "Ruby: Failed to find encoding index 'encdb'");
316 
317         return Qnil;
318     }
319 
320     rb_funcall(rb_cObject, rb_intern("require"), 1,
321                rb_str_new2("enc/trans/transdb"));
322 
323     return arg;
324 }
325 
326 
327 static VALUE
328 nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init)
329 {
330     int    state;
331     VALUE  rackup, err;
332 
333     rb_protect(nxt_ruby_require_rubygems, Qnil, &state);
334     if (nxt_slow_path(state != 0)) {
335         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
336                                "Failed to require 'rubygems' package");
337         return Qnil;
338     }
339 
340     rb_protect(nxt_ruby_bundler_setup, Qnil, &state);
341     if (state != 0) {
342         err = rb_errinfo();
343 
344         if (rb_obj_is_kind_of(err, rb_eLoadError) == Qfalse) {
345             nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
346                                    "Failed to require 'bundler/setup' package");
347             return Qnil;
348         }
349 
350         rb_set_errinfo(Qnil);
351     }
352 
353     rb_protect(nxt_ruby_require_rack, Qnil, &state);
354     if (nxt_slow_path(state != 0)) {
355         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
356                                "Failed to require 'rack' package");
357         return Qnil;
358     }
359 
360     rackup = rb_protect(nxt_ruby_rack_parse_script,
361                         (VALUE) (uintptr_t) rack_init, &state);
362     if (nxt_slow_path(TYPE(rackup) != T_ARRAY || state != 0)) {
363         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
364                                "Failed to parse rack script");
365         return Qnil;
366     }
367 
368     if (nxt_slow_path(RARRAY_LEN(rackup) < 1)) {
369         nxt_alert(rack_init->task, "Ruby: Invalid rack config file");
370         return Qnil;
371     }
372 
373     return RARRAY_PTR(rackup)[0];
374 }
375 
376 
377 static VALUE
378 nxt_ruby_require_rubygems(VALUE arg)
379 {
380     return rb_funcall(rb_cObject, rb_intern("require"), 1,
381                       rb_str_new2("rubygems"));
382 }
383 
384 
385 static VALUE
386 nxt_ruby_bundler_setup(VALUE arg)
387 {
388     return rb_funcall(rb_cObject, rb_intern("require"), 1,
389                       rb_str_new2("bundler/setup"));
390 }
391 
392 
393 static VALUE
394 nxt_ruby_require_rack(VALUE arg)
395 {
396     return rb_funcall(rb_cObject, rb_intern("require"), 1, rb_str_new2("rack"));
397 }
398 
399 
400 static VALUE
401 nxt_ruby_rack_parse_script(VALUE ctx)
402 {
403     VALUE                 script, res, rack, builder;
404     nxt_ruby_rack_init_t  *rack_init;
405 
406     rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) ctx;
407 
408     rack = rb_const_get(rb_cObject, rb_intern("Rack"));
409     builder = rb_const_get(rack, rb_intern("Builder"));
410 
411     script = rb_str_new((const char *) rack_init->script->start,
412                         (long) rack_init->script->length);
413 
414     res = rb_funcall(builder, rb_intern("parse_file"), 1, script);
415 
416     rb_str_free(script);
417 
418     return res;
419 }
420 
421 
422 static VALUE
423 nxt_ruby_rack_env_create(VALUE arg)
424 {
425     int             rc;
426     VALUE           hash_env, version;
427     nxt_ruby_ctx_t  *rctx;
428 
429     rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
430 
431     rc = nxt_ruby_init_io(rctx);
432     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
433         return Qnil;
434     }
435 
436     hash_env = rb_hash_new();
437 
438     rb_hash_aset(hash_env, rb_str_new2("SERVER_SOFTWARE"),
439                  rb_str_new((const char *) nxt_server.start,
440                             (long) nxt_server.length));
441 
442     version = rb_ary_new();
443 
444     rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MAJOR));
445     rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MINOR));
446 
447     rb_hash_aset(hash_env, rb_str_new2("rack.version"), version);
448     rb_hash_aset(hash_env, rb_str_new2("rack.input"), rctx->io_input);
449     rb_hash_aset(hash_env, rb_str_new2("rack.errors"), rctx->io_error);
450     rb_hash_aset(hash_env, rb_str_new2("rack.multithread"),
451                  nxt_ruby_threads > 1 ? Qtrue : Qfalse);
452     rb_hash_aset(hash_env, rb_str_new2("rack.multiprocess"), Qtrue);
453     rb_hash_aset(hash_env, rb_str_new2("rack.run_once"), Qfalse);
454     rb_hash_aset(hash_env, rb_str_new2("rack.hijack?"), Qfalse);
455     rb_hash_aset(hash_env, rb_str_new2("rack.hijack"), Qnil);
456     rb_hash_aset(hash_env, rb_str_new2("rack.hijack_io"), Qnil);
457 
458     rctx->env = hash_env;
459 
460     rb_gc_register_address(&rctx->env);
461 
462     return hash_env;
463 }
464 
465 
466 static int
467 nxt_ruby_init_io(nxt_ruby_ctx_t *rctx)
468 {
469     VALUE  io_input, io_error;
470 
471     io_input = nxt_ruby_stream_io_input_init();
472 
473     rctx->io_input = rb_funcall(io_input, rb_intern("new"), 1,
474                                    (VALUE) (uintptr_t) rctx);
475     if (nxt_slow_path(rctx->io_input == Qnil)) {
476         nxt_unit_alert(NULL,
477                        "Ruby: Failed to create environment 'rack.input' var");
478 
479         return NXT_UNIT_ERROR;
480     }
481 
482     rb_gc_register_address(&rctx->io_input);
483 
484     io_error = nxt_ruby_stream_io_error_init();
485 
486     rctx->io_error = rb_funcall(io_error, rb_intern("new"), 1,
487                                    (VALUE) (uintptr_t) rctx);
488     if (nxt_slow_path(rctx->io_error == Qnil)) {
489         nxt_unit_alert(NULL,
490                        "Ruby: Failed to create environment 'rack.error' var");
491 
492         return NXT_UNIT_ERROR;
493     }
494 
495     rb_gc_register_address(&rctx->io_error);
496 
497     return NXT_UNIT_OK;
498 }
499 
500 
501 static void
502 nxt_ruby_request_handler(nxt_unit_request_info_t *req)
503 {
504     (void) rb_thread_call_with_gvl(nxt_ruby_request_handler_gvl, req);
505 }
506 
507 
508 static void *
509 nxt_ruby_request_handler_gvl(void *data)
510 {
511     int                      state;
512     VALUE                    res;
513     nxt_ruby_ctx_t           *rctx;
514     nxt_unit_request_info_t  *req;
515 
516     req = data;
517 
518     rctx = req->ctx->data;
519     rctx->req = req;
520 
521     res = rb_protect(nxt_ruby_rack_app_run, (VALUE) (uintptr_t) req, &state);
522     if (nxt_slow_path(res == Qnil || state != 0)) {
523         nxt_ruby_exception_log(req, NXT_LOG_ERR,
524                                "Failed to run ruby script");
525 
526         nxt_unit_request_done(req, NXT_UNIT_ERROR);
527 
528     } else {
529         nxt_unit_request_done(req, NXT_UNIT_OK);
530     }
531 
532     rctx->req = NULL;
533 
534     return NULL;
535 }
536 
537 
538 static VALUE
539 nxt_ruby_rack_app_run(VALUE arg)
540 {
541     int                      rc;
542     VALUE                    env, result;
543     nxt_int_t                status;
544     nxt_ruby_ctx_t           *rctx;
545     nxt_unit_request_info_t  *req;
546 
547     req = (nxt_unit_request_info_t *) arg;
548 
549     rctx = req->ctx->data;
550 
551     env = rb_hash_dup(rctx->env);
552 
553     rc = nxt_ruby_read_request(req, env);
554     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
555         nxt_unit_req_alert(req,
556                            "Ruby: Failed to process incoming request");
557 
558         goto fail;
559     }
560 
561     result = rb_funcall(nxt_ruby_rackup, nxt_ruby_call, 1, env);
562     if (nxt_slow_path(TYPE(result) != T_ARRAY)) {
563         nxt_unit_req_error(req,
564                            "Ruby: Invalid response format from application");
565 
566         goto fail;
567     }
568 
569     if (nxt_slow_path(RARRAY_LEN(result) != 3)) {
570         nxt_unit_req_error(req,
571                            "Ruby: Invalid response format from application. "
572                            "Need 3 entries [Status, Headers, Body]");
573 
574         goto fail;
575     }
576 
577     status = nxt_ruby_rack_result_status(req, result);
578     if (nxt_slow_path(status < 0)) {
579         nxt_unit_req_error(req,
580                            "Ruby: Invalid response status from application.");
581 
582         goto fail;
583     }
584 
585     rc = nxt_ruby_rack_result_headers(req, result, status);
586     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
587         goto fail;
588     }
589 
590     rc = nxt_ruby_rack_result_body(req, result);
591     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
592         goto fail;
593     }
594 
595     rb_hash_delete(env, rb_obj_id(env));
596 
597     return result;
598 
599 fail:
600 
601     rb_hash_delete(env, rb_obj_id(env));
602 
603     return Qnil;
604 }
605 
606 
607 static int
608 nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env)
609 {
610     VALUE               name;
611     uint32_t            i;
612     nxt_unit_field_t    *f;
613     nxt_unit_request_t  *r;
614 
615     r = req->request;
616 
617     nxt_ruby_add_sptr(hash_env, nxt_rb_request_method_str, &r->method,
618                       r->method_length);
619     nxt_ruby_add_sptr(hash_env, nxt_rb_request_uri_str, &r->target,
620                       r->target_length);
621     nxt_ruby_add_sptr(hash_env, nxt_rb_path_info_str, &r->path, r->path_length);
622     nxt_ruby_add_sptr(hash_env, nxt_rb_query_string_str, &r->query,
623                       r->query_length);
624     nxt_ruby_add_sptr(hash_env, nxt_rb_server_protocol_str, &r->version,
625                       r->version_length);
626     nxt_ruby_add_sptr(hash_env, nxt_rb_remote_addr_str, &r->remote,
627                       r->remote_length);
628     nxt_ruby_add_sptr(hash_env, nxt_rb_server_addr_str, &r->local,
629                       r->local_length);
630     nxt_ruby_add_sptr(hash_env, nxt_rb_server_name_str, &r->server_name,
631                       r->server_name_length);
632 
633     rb_hash_aset(hash_env, nxt_rb_server_port_str, nxt_rb_80_str);
634 
635     rb_hash_aset(hash_env, nxt_rb_rack_url_scheme_str,
636                  r->tls ? nxt_rb_https_str : nxt_rb_http_str);
637 
638     for (i = 0; i < r->fields_count; i++) {
639         f = r->fields + i;
640 
641         name = rb_str_new(nxt_unit_sptr_get(&f->name), f->name_length);
642 
643         nxt_ruby_add_sptr(hash_env, name, &f->value, f->value_length);
644     }
645 
646     if (r->content_length_field != NXT_UNIT_NONE_FIELD) {
647         f = r->fields + r->content_length_field;
648 
649         nxt_ruby_add_sptr(hash_env, nxt_rb_content_length_str,
650                           &f->value, f->value_length);
651     }
652 
653     if (r->content_type_field != NXT_UNIT_NONE_FIELD) {
654         f = r->fields + r->content_type_field;
655 
656         nxt_ruby_add_sptr(hash_env, nxt_rb_content_type_str,
657                           &f->value, f->value_length);
658     }
659 
660     return NXT_UNIT_OK;
661 }
662 
663 
664 nxt_inline void
665 nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
666     nxt_unit_sptr_t *sptr, uint32_t len)
667 {
668     char  *str;
669 
670     str = nxt_unit_sptr_get(sptr);
671 
672     rb_hash_aset(hash_env, name, rb_str_new(str, len));
673 }
674 
675 
676 static nxt_int_t
677 nxt_ruby_rack_result_status(nxt_unit_request_info_t *req, VALUE result)
678 {
679     VALUE   status;
680 
681     status = rb_ary_entry(result, 0);
682 
683     if (TYPE(status) == T_FIXNUM) {
684         return FIX2INT(status);
685     }
686 
687     if (TYPE(status) == T_STRING) {
688         return nxt_int_parse((u_char *) RSTRING_PTR(status),
689                              RSTRING_LEN(status));
690     }
691 
692     nxt_unit_req_error(req, "Ruby: Invalid response 'status' "
693                        "format from application");
694 
695     return -2;
696 }
697 
698 
699 typedef struct {
700     int                      rc;
701     uint32_t                 fields;
702     uint32_t                 size;
703     nxt_unit_request_info_t  *req;
704 } nxt_ruby_headers_info_t;
705 
706 
707 static int
708 nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req, VALUE result,
709     nxt_int_t status)
710 {
711     int                      rc;
712     VALUE                    headers;
713     nxt_ruby_headers_info_t  headers_info;
714 
715     headers = rb_ary_entry(result, 1);
716     if (nxt_slow_path(TYPE(headers) != T_HASH)) {
717         nxt_unit_req_error(req,
718                            "Ruby: Invalid response 'headers' format from "
719                            "application");
720 
721         return NXT_UNIT_ERROR;
722     }
723 
724     rc = NXT_UNIT_OK;
725 
726     headers_info.rc = NXT_UNIT_OK;
727     headers_info.fields = 0;
728     headers_info.size = 0;
729     headers_info.req = req;
730 
731     rb_hash_foreach(headers, nxt_ruby_hash_info,
732                     (VALUE) (uintptr_t) &headers_info);
733     if (nxt_slow_path(headers_info.rc != NXT_UNIT_OK)) {
734         return headers_info.rc;
735     }
736 
737     rc = nxt_unit_response_init(req, status,
738                                 headers_info.fields, headers_info.size);
739     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
740         return rc;
741     }
742 
743     rb_hash_foreach(headers, nxt_ruby_hash_add,
744                     (VALUE) (uintptr_t) &headers_info);
745 
746     return rc;
747 }
748 
749 
750 static int
751 nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg)
752 {
753     const char               *value, *value_end, *pos;
754     nxt_ruby_headers_info_t  *headers_info;
755 
756     headers_info = (void *) (uintptr_t) arg;
757 
758     if (nxt_slow_path(TYPE(r_key) != T_STRING)) {
759         nxt_unit_req_error(headers_info->req,
760                            "Ruby: Wrong header entry 'key' from application");
761 
762         goto fail;
763     }
764 
765     if (nxt_slow_path(TYPE(r_value) != T_STRING)) {
766         nxt_unit_req_error(headers_info->req,
767                            "Ruby: Wrong header entry 'value' from application");
768 
769         goto fail;
770     }
771 
772     value = RSTRING_PTR(r_value);
773     value_end = value + RSTRING_LEN(r_value);
774 
775     pos = value;
776 
777     for ( ;; ) {
778         pos = strchr(pos, '\n');
779 
780         if (pos == NULL) {
781             break;
782         }
783 
784         headers_info->fields++;
785         headers_info->size += RSTRING_LEN(r_key) + (pos - value);
786 
787         pos++;
788         value = pos;
789     }
790 
791     if (value <= value_end) {
792         headers_info->fields++;
793         headers_info->size += RSTRING_LEN(r_key) + (value_end - value);
794     }
795 
796     return ST_CONTINUE;
797 
798 fail:
799 
800     headers_info->rc = NXT_UNIT_ERROR;
801 
802     return ST_STOP;
803 }
804 
805 
806 static int
807 nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg)
808 {
809     int                      *rc;
810     uint32_t                 key_len;
811     const char               *value, *value_end, *pos;
812     nxt_ruby_headers_info_t  *headers_info;
813 
814     headers_info = (void *) (uintptr_t) arg;
815     rc = &headers_info->rc;
816 
817     value = RSTRING_PTR(r_value);
818     value_end = value + RSTRING_LEN(r_value);
819 
820     key_len = RSTRING_LEN(r_key);
821 
822     pos = value;
823 
824     for ( ;; ) {
825         pos = strchr(pos, '\n');
826 
827         if (pos == NULL) {
828             break;
829         }
830 
831         *rc = nxt_unit_response_add_field(headers_info->req,
832                                           RSTRING_PTR(r_key), key_len,
833                                           value, pos - value);
834         if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
835             goto fail;
836         }
837 
838         pos++;
839         value = pos;
840     }
841 
842     if (value <= value_end) {
843         *rc = nxt_unit_response_add_field(headers_info->req,
844                                           RSTRING_PTR(r_key), key_len,
845                                           value, value_end - value);
846         if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
847             goto fail;
848         }
849     }
850 
851     return ST_CONTINUE;
852 
853 fail:
854 
855     *rc = NXT_UNIT_ERROR;
856 
857     return ST_STOP;
858 }
859 
860 
861 static int
862 nxt_ruby_rack_result_body(nxt_unit_request_info_t *req, VALUE result)
863 {
864     int    rc;
865     VALUE  fn, body;
866 
867     body = rb_ary_entry(result, 2);
868 
869     if (rb_respond_to(body, rb_intern("to_path"))) {
870 
871         fn = rb_funcall(body, rb_intern("to_path"), 0);
872         if (nxt_slow_path(TYPE(fn) != T_STRING)) {
873             nxt_unit_req_error(req,
874                                "Ruby: Failed to get 'body' file path from "
875                                "application");
876 
877             return NXT_UNIT_ERROR;
878         }
879 
880         rc = nxt_ruby_rack_result_body_file_write(req, fn);
881         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
882             return rc;
883         }
884 
885     } else if (rb_respond_to(body, rb_intern("each"))) {
886         rb_block_call(body, rb_intern("each"), 0, 0,
887                       nxt_ruby_rack_result_body_each, (VALUE) (uintptr_t) req);
888 
889     } else {
890         nxt_unit_req_error(req,
891                            "Ruby: Invalid response 'body' format "
892                            "from application");
893 
894         return NXT_UNIT_ERROR;
895     }
896 
897     if (rb_respond_to(body, rb_intern("close"))) {
898         rb_funcall(body, rb_intern("close"), 0);
899     }
900 
901     return NXT_UNIT_OK;
902 }
903 
904 
905 typedef struct {
906     int    fd;
907     off_t  pos;
908     off_t  rest;
909 } nxt_ruby_rack_file_t;
910 
911 
912 static ssize_t
913 nxt_ruby_rack_file_read(nxt_unit_read_info_t *read_info, void *dst, size_t size)
914 {
915     ssize_t               res;
916     nxt_ruby_rack_file_t  *file;
917 
918     file = read_info->data;
919 
920     size = nxt_min(size, (size_t) file->rest);
921 
922     res = pread(file->fd, dst, size, file->pos);
923 
924     if (res >= 0) {
925         file->pos += res;
926         file->rest -= res;
927 
928         if (size > (size_t) res) {
929             file->rest = 0;
930         }
931     }
932 
933     read_info->eof = file->rest == 0;
934 
935     return res;
936 }
937 
938 
939 typedef struct {
940     nxt_unit_read_info_t     read_info;
941     nxt_unit_request_info_t  *req;
942 } nxt_ruby_read_info_t;
943 
944 
945 static int
946 nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
947     VALUE filepath)
948 {
949     int                   fd, rc;
950     struct stat           finfo;
951     nxt_ruby_rack_file_t  ruby_file;
952     nxt_ruby_read_info_t  ri;
953 
954     fd = open(RSTRING_PTR(filepath), O_RDONLY, 0);
955     if (nxt_slow_path(fd == -1)) {
956         nxt_unit_req_error(req,
957                            "Ruby: Failed to open content file \"%s\": %s (%d)",
958                            RSTRING_PTR(filepath), strerror(errno), errno);
959 
960         return NXT_UNIT_ERROR;
961     }
962 
963     rc = fstat(fd, &finfo);
964     if (nxt_slow_path(rc == -1)) {
965         nxt_unit_req_error(req,
966                            "Ruby: Content file fstat(\"%s\") failed: %s (%d)",
967                            RSTRING_PTR(filepath), strerror(errno), errno);
968 
969         close(fd);
970 
971         return NXT_UNIT_ERROR;
972     }
973 
974     ruby_file.fd = fd;
975     ruby_file.pos = 0;
976     ruby_file.rest = finfo.st_size;
977 
978     ri.read_info.read = nxt_ruby_rack_file_read;
979     ri.read_info.eof = ruby_file.rest == 0;
980     ri.read_info.buf_size = ruby_file.rest;
981     ri.read_info.data = &ruby_file;
982     ri.req = req;
983 
984     rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_response_write_cb,
985                                                &ri,
986                                                nxt_ruby_ubf,
987                                                req->ctx);
988 
989     close(fd);
990 
991     return rc;
992 }
993 
994 
995 static void *
996 nxt_ruby_response_write_cb(void *data)
997 {
998     int                   rc;
999     nxt_ruby_read_info_t  *ri;
1000 
1001     ri = data;
1002 
1003     rc = nxt_unit_response_write_cb(ri->req, &ri->read_info);
1004     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1005         nxt_unit_req_error(ri->req, "Ruby: Failed to write content file.");
1006     }
1007 
1008     return (void *) (intptr_t) rc;
1009 }
1010 
1011 
1012 typedef struct {
1013     VALUE                    body;
1014     nxt_unit_request_info_t  *req;
1015 } nxt_ruby_write_info_t;
1016 
1017 
1018 static VALUE
1019 nxt_ruby_rack_result_body_each(VALUE body, VALUE arg, int argc,
1020     const VALUE *argv, VALUE blockarg)
1021 {
1022     nxt_ruby_write_info_t  wi;
1023 
1024     if (TYPE(body) != T_STRING) {
1025         return Qnil;
1026     }
1027 
1028     wi.body = body;
1029     wi.req = (void *) (uintptr_t) arg;
1030 
1031     (void) rb_thread_call_without_gvl(nxt_ruby_response_write,
1032                                       (void *) (uintptr_t) &wi,
1033                                       nxt_ruby_ubf, wi.req->ctx);
1034 
1035     return Qnil;
1036 }
1037 
1038 
1039 static void *
1040 nxt_ruby_response_write(void *data)
1041 {
1042     int                    rc;
1043     nxt_ruby_write_info_t  *wi;
1044 
1045     wi = data;
1046 
1047     rc = nxt_unit_response_write(wi->req, RSTRING_PTR(wi->body),
1048                                  RSTRING_LEN(wi->body));
1049     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1050         nxt_unit_req_error(wi->req,
1051                            "Ruby: Failed to write 'body' from application");
1052     }
1053 
1054     return (void *) (intptr_t) rc;
1055 }
1056 
1057 
1058 static void
1059 nxt_ruby_exception_log(nxt_unit_request_info_t *req, uint32_t level,
1060     const char *desc)
1061 {
1062     int    i;
1063     VALUE  err, ary, eclass, msg;
1064 
1065     nxt_unit_req_log(req, level, "Ruby: %s", desc);
1066 
1067     err = rb_errinfo();
1068     if (nxt_slow_path(err == Qnil)) {
1069         return;
1070     }
1071 
1072     ary = rb_funcall(err, rb_intern("backtrace"), 0);
1073     if (nxt_slow_path(RARRAY_LEN(ary) == 0)) {
1074         return;
1075     }
1076 
1077     eclass = rb_class_name(rb_class_of(err));
1078     msg = rb_funcall(err, rb_intern("message"), 0);
1079 
1080     nxt_unit_req_log(req, level, "Ruby: %s: %s (%s)",
1081                      RSTRING_PTR(RARRAY_PTR(ary)[0]),
1082                      RSTRING_PTR(msg), RSTRING_PTR(eclass));
1083 
1084     for (i = 1; i < RARRAY_LEN(ary); i++) {
1085         nxt_unit_req_log(req, level, "from %s",
1086                          RSTRING_PTR(RARRAY_PTR(ary)[i]));
1087     }
1088 }
1089 
1090 
1091 static void
1092 nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx)
1093 {
1094     if (rctx->io_input != Qnil) {
1095         rb_gc_unregister_address(&rctx->io_input);
1096     }
1097 
1098     if (rctx->io_error != Qnil) {
1099         rb_gc_unregister_address(&rctx->io_error);
1100     }
1101 
1102     if (rctx->env != Qnil) {
1103         rb_gc_unregister_address(&rctx->env);
1104     }
1105 }
1106 
1107 
1108 static void
1109 nxt_ruby_atexit(void)
1110 {
1111     if (nxt_ruby_rackup != Qnil) {
1112         rb_gc_unregister_address(&nxt_ruby_rackup);
1113     }
1114 
1115     if (nxt_ruby_call != Qnil) {
1116         rb_gc_unregister_address(&nxt_ruby_call);
1117     }
1118 
1119     nxt_ruby_done_strings();
1120 
1121     ruby_cleanup(0);
1122 }
1123 
1124 
1125 static int
1126 nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx)
1127 {
1128     VALUE                res;
1129     uint32_t             i;
1130     nxt_ruby_ctx_t       *rctx;
1131     nxt_ruby_app_conf_t  *c;
1132 
1133     /* Worker thread context. */
1134     if (!nxt_unit_is_main_ctx(ctx)) {
1135         return NXT_UNIT_OK;
1136     }
1137 
1138     c = ctx->unit->data;
1139 
1140     if (c->threads <= 1) {
1141         return NXT_UNIT_OK;
1142     }
1143 
1144     for (i = 0; i < c->threads - 1; i++) {
1145         rctx = &nxt_ruby_ctxs[i];
1146 
1147         rctx->ctx = ctx;
1148 
1149         res = (VALUE) rb_thread_call_with_gvl(nxt_ruby_thread_create_gvl, rctx);
1150 
1151         if (nxt_fast_path(res != Qnil)) {
1152             nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1));
1153 
1154             rctx->thread = res;
1155 
1156         } else {
1157             nxt_unit_alert(ctx, "thread #%d create failed", (int) (i + 1));
1158 
1159             return NXT_UNIT_ERROR;
1160         }
1161     }
1162 
1163     return NXT_UNIT_OK;
1164 }
1165 
1166 
1167 static void *
1168 nxt_ruby_thread_create_gvl(void *rctx)
1169 {
1170     VALUE  res;
1171 
1172     res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx);
1173 
1174     return (void *) (uintptr_t) res;
1175 }
1176 
1177 
1178 static VALUE
1179 nxt_ruby_thread_func(VALUE arg)
1180 {
1181     nxt_unit_ctx_t  *ctx;
1182     nxt_ruby_ctx_t  *rctx;
1183 
1184     rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
1185 
1186     nxt_unit_debug(rctx->ctx, "worker thread start");
1187 
1188     ctx = nxt_unit_ctx_alloc(rctx->ctx, rctx);
1189     if (nxt_slow_path(ctx == NULL)) {
1190         goto fail;
1191     }
1192 
1193     (void) rb_thread_call_without_gvl(nxt_ruby_unit_run, ctx,
1194                                       nxt_ruby_ubf, ctx);
1195 
1196     nxt_unit_done(ctx);
1197 
1198 fail:
1199 
1200     nxt_unit_debug(NULL, "worker thread end");
1201 
1202     return Qnil;
1203 }
1204 
1205 
1206 static void *
1207 nxt_ruby_unit_run(void *ctx)
1208 {
1209     return (void *) (intptr_t) nxt_unit_run(ctx);
1210 }
1211 
1212 
1213 static void
1214 nxt_ruby_ubf(void *ctx)
1215 {
1216     nxt_unit_warn(ctx, "Ruby: UBF");
1217 }
1218 
1219 
1220 static int
1221 nxt_ruby_init_threads(nxt_ruby_app_conf_t *c)
1222 {
1223     int             state;
1224     uint32_t        i;
1225     nxt_ruby_ctx_t  *rctx;
1226 
1227     if (c->threads <= 1) {
1228         return NXT_UNIT_OK;
1229     }
1230 
1231     nxt_ruby_ctxs = nxt_unit_malloc(NULL, sizeof(nxt_ruby_ctx_t)
1232                                           * (c->threads - 1));
1233     if (nxt_slow_path(nxt_ruby_ctxs == NULL)) {
1234         nxt_unit_alert(NULL, "Failed to allocate run contexts array");
1235 
1236         return NXT_UNIT_ERROR;
1237     }
1238 
1239     for (i = 0; i < c->threads - 1; i++) {
1240         rctx = &nxt_ruby_ctxs[i];
1241 
1242         rctx->env = Qnil;
1243         rctx->io_input = Qnil;
1244         rctx->io_error = Qnil;
1245         rctx->thread = Qnil;
1246     }
1247 
1248     for (i = 0; i < c->threads - 1; i++) {
1249         rctx = &nxt_ruby_ctxs[i];
1250 
1251         rctx->env = rb_protect(nxt_ruby_rack_env_create,
1252                                (VALUE) (uintptr_t) rctx, &state);
1253         if (nxt_slow_path(rctx->env == Qnil || state != 0)) {
1254             nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
1255                                    "Failed to create 'environ' variable");
1256             return NXT_UNIT_ERROR;
1257         }
1258     }
1259 
1260     return NXT_UNIT_OK;
1261 }
1262 
1263 
1264 static void
1265 nxt_ruby_join_threads(nxt_unit_ctx_t *ctx, nxt_ruby_app_conf_t *c)
1266 {
1267     uint32_t        i;
1268     nxt_ruby_ctx_t  *rctx;
1269 
1270     if (nxt_ruby_ctxs == NULL) {
1271         return;
1272     }
1273 
1274     for (i = 0; i < c->threads - 1; i++) {
1275         rctx = &nxt_ruby_ctxs[i];
1276 
1277         if (rctx->thread != Qnil) {
1278             rb_funcall(rctx->thread, rb_intern("join"), 0);
1279 
1280             nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1));
1281 
1282         } else {
1283             nxt_unit_debug(ctx, "thread #%d not started", (int) (i + 1));
1284         }
1285     }
1286 
1287     for (i = 0; i < c->threads - 1; i++) {
1288         nxt_ruby_ctx_done(&nxt_ruby_ctxs[i]);
1289     }
1290 
1291     nxt_unit_free(ctx, nxt_ruby_ctxs);
1292 }
1293