xref: /unit/src/ruby/nxt_ruby.c (revision 1738:4a7bb9e7678a)
1 /*
2  * Copyright (C) Alexander Borisov
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <ruby/nxt_ruby.h>
7 
8 #include <nxt_unit.h>
9 #include <nxt_unit_request.h>
10 
11 #include <ruby/thread.h>
12 
13 #include NXT_RUBY_MOUNTS_H
14 
15 
16 #define NXT_RUBY_RACK_API_VERSION_MAJOR  1
17 #define NXT_RUBY_RACK_API_VERSION_MINOR  3
18 
19 
20 typedef struct {
21     nxt_task_t      *task;
22     nxt_str_t       *script;
23     nxt_ruby_ctx_t  *rctx;
24 } nxt_ruby_rack_init_t;
25 
26 
27 static nxt_int_t nxt_ruby_start(nxt_task_t *task,
28     nxt_process_data_t *data);
29 static VALUE nxt_ruby_init_basic(VALUE arg);
30 static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init);
31 
32 static VALUE nxt_ruby_require_rubygems(VALUE arg);
33 static VALUE nxt_ruby_bundler_setup(VALUE arg);
34 static VALUE nxt_ruby_require_rack(VALUE arg);
35 static VALUE nxt_ruby_rack_parse_script(VALUE ctx);
36 static VALUE nxt_ruby_rack_env_create(VALUE arg);
37 static int nxt_ruby_init_io(nxt_ruby_ctx_t *rctx);
38 static void nxt_ruby_request_handler(nxt_unit_request_info_t *req);
39 static void *nxt_ruby_request_handler_gvl(void *req);
40 static int nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx);
41 static void *nxt_ruby_thread_create_gvl(void *rctx);
42 static VALUE nxt_ruby_thread_func(VALUE arg);
43 static void *nxt_ruby_unit_run(void *ctx);
44 static void nxt_ruby_ubf(void *ctx);
45 static int nxt_ruby_init_threads(nxt_ruby_app_conf_t *c);
46 static void nxt_ruby_join_threads(nxt_unit_ctx_t *ctx,
47     nxt_ruby_app_conf_t *c);
48 
49 static VALUE nxt_ruby_rack_app_run(VALUE arg);
50 static int nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env);
51 nxt_inline void nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
52     nxt_unit_sptr_t *sptr, uint32_t len);
53 static nxt_int_t nxt_ruby_rack_result_status(nxt_unit_request_info_t *req,
54     VALUE result);
55 static int nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req,
56     VALUE result, nxt_int_t status);
57 static int nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg);
58 static int nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg);
59 static int nxt_ruby_rack_result_body(nxt_unit_request_info_t *req,
60     VALUE result);
61 static int nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
62     VALUE filepath);
63 static void *nxt_ruby_response_write_cb(void *read_info);
64 static VALUE nxt_ruby_rack_result_body_each(VALUE body, VALUE arg,
65     int argc, const VALUE *argv, VALUE blockarg);
66 static void *nxt_ruby_response_write(void *body);
67 
68 static void nxt_ruby_exception_log(nxt_unit_request_info_t *req,
69     uint32_t level, const char *desc);
70 
71 static void nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx);
72 static void nxt_ruby_atexit(void);
73 
74 
75 static uint32_t  compat[] = {
76     NXT_VERNUM, NXT_DEBUG,
77 };
78 
79 static VALUE  nxt_ruby_rackup;
80 static VALUE  nxt_ruby_call;
81 
82 static uint32_t        nxt_ruby_threads;
83 static nxt_ruby_ctx_t  *nxt_ruby_ctxs;
84 
85 NXT_EXPORT nxt_app_module_t  nxt_app_module = {
86     sizeof(compat),
87     compat,
88     nxt_string("ruby"),
89     ruby_version,
90     nxt_ruby_mounts,
91     nxt_nitems(nxt_ruby_mounts),
92     NULL,
93     nxt_ruby_start,
94 };
95 
96 typedef struct {
97     nxt_str_t  string;
98     VALUE      *v;
99 } nxt_ruby_string_t;
100 
101 static VALUE  nxt_rb_80_str;
102 static VALUE  nxt_rb_content_length_str;
103 static VALUE  nxt_rb_content_type_str;
104 static VALUE  nxt_rb_http_str;
105 static VALUE  nxt_rb_https_str;
106 static VALUE  nxt_rb_path_info_str;
107 static VALUE  nxt_rb_query_string_str;
108 static VALUE  nxt_rb_rack_url_scheme_str;
109 static VALUE  nxt_rb_remote_addr_str;
110 static VALUE  nxt_rb_request_method_str;
111 static VALUE  nxt_rb_request_uri_str;
112 static VALUE  nxt_rb_server_addr_str;
113 static VALUE  nxt_rb_server_name_str;
114 static VALUE  nxt_rb_server_port_str;
115 static VALUE  nxt_rb_server_protocol_str;
116 
117 static nxt_ruby_string_t nxt_rb_strings[] = {
118     { nxt_string("80"), &nxt_rb_80_str },
119     { nxt_string("CONTENT_LENGTH"), &nxt_rb_content_length_str },
120     { nxt_string("CONTENT_TYPE"), &nxt_rb_content_type_str },
121     { nxt_string("http"), &nxt_rb_http_str },
122     { nxt_string("https"), &nxt_rb_https_str },
123     { nxt_string("PATH_INFO"), &nxt_rb_path_info_str },
124     { nxt_string("QUERY_STRING"), &nxt_rb_query_string_str },
125     { nxt_string("rack.url_scheme"), &nxt_rb_rack_url_scheme_str },
126     { nxt_string("REMOTE_ADDR"), &nxt_rb_remote_addr_str },
127     { nxt_string("REQUEST_METHOD"), &nxt_rb_request_method_str },
128     { nxt_string("REQUEST_URI"), &nxt_rb_request_uri_str },
129     { nxt_string("SERVER_ADDR"), &nxt_rb_server_addr_str },
130     { nxt_string("SERVER_NAME"), &nxt_rb_server_name_str },
131     { nxt_string("SERVER_PORT"), &nxt_rb_server_port_str },
132     { nxt_string("SERVER_PROTOCOL"), &nxt_rb_server_protocol_str },
133     { nxt_null_string, NULL },
134 };
135 
136 
137 static int
138 nxt_ruby_init_strings(void)
139 {
140     VALUE              v;
141     nxt_ruby_string_t  *pstr;
142 
143     pstr = nxt_rb_strings;
144 
145     while (pstr->string.start != NULL) {
146         v = rb_str_new_static((char *) pstr->string.start, pstr->string.length);
147 
148         if (nxt_slow_path(v == Qnil)) {
149             nxt_unit_alert(NULL, "Ruby: failed to create const string '%.*s'",
150                            (int) pstr->string.length,
151                            (char *) pstr->string.start);
152 
153             return NXT_UNIT_ERROR;
154         }
155 
156         *pstr->v = v;
157 
158         rb_gc_register_address(pstr->v);
159 
160         pstr++;
161     }
162 
163     return NXT_UNIT_OK;
164 }
165 
166 
167 static void
168 nxt_ruby_done_strings(void)
169 {
170     nxt_ruby_string_t  *pstr;
171 
172     pstr = nxt_rb_strings;
173 
174     while (pstr->string.start != NULL) {
175         rb_gc_unregister_address(pstr->v);
176 
177         *pstr->v = Qnil;
178 
179         pstr++;
180     }
181 }
182 
183 
184 static nxt_int_t
185 nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data)
186 {
187     int                    state, rc;
188     VALUE                  res;
189     nxt_ruby_ctx_t         ruby_ctx;
190     nxt_unit_ctx_t         *unit_ctx;
191     nxt_unit_init_t        ruby_unit_init;
192     nxt_ruby_app_conf_t    *c;
193     nxt_ruby_rack_init_t   rack_init;
194     nxt_common_app_conf_t  *conf;
195 
196     static char  *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" };
197 
198     conf = data->app;
199     c = &conf->u.ruby;
200 
201     nxt_ruby_threads = c->threads;
202 
203     RUBY_INIT_STACK
204     ruby_init();
205     ruby_options(2, argv);
206     ruby_script("NGINX_Unit");
207 
208     ruby_ctx.env = Qnil;
209     ruby_ctx.io_input = Qnil;
210     ruby_ctx.io_error = Qnil;
211     ruby_ctx.thread = Qnil;
212     ruby_ctx.ctx = NULL;
213     ruby_ctx.req = NULL;
214 
215     rack_init.task = task;
216     rack_init.script = &c->script;
217     rack_init.rctx = &ruby_ctx;
218 
219     nxt_ruby_init_strings();
220 
221     res = rb_protect(nxt_ruby_init_basic,
222                      (VALUE) (uintptr_t) &rack_init, &state);
223     if (nxt_slow_path(res == Qnil || state != 0)) {
224         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
225                                "Failed to init basic variables");
226         return NXT_ERROR;
227     }
228 
229     nxt_ruby_call = Qnil;
230 
231     nxt_ruby_rackup = nxt_ruby_rack_init(&rack_init);
232     if (nxt_slow_path(nxt_ruby_rackup == Qnil)) {
233         return NXT_ERROR;
234     }
235 
236     rb_gc_register_address(&nxt_ruby_rackup);
237 
238     nxt_ruby_call = rb_intern("call");
239     if (nxt_slow_path(nxt_ruby_call == Qnil)) {
240         nxt_alert(task, "Ruby: Unable to find rack entry point");
241 
242         goto fail;
243     }
244 
245     rb_gc_register_address(&nxt_ruby_call);
246 
247     ruby_ctx.env = rb_protect(nxt_ruby_rack_env_create,
248                               (VALUE) (uintptr_t) &ruby_ctx, &state);
249     if (nxt_slow_path(ruby_ctx.env == Qnil || state != 0)) {
250         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
251                                "Failed to create 'environ' variable");
252         goto fail;
253     }
254 
255     rc = nxt_ruby_init_threads(c);
256     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
257         goto fail;
258     }
259 
260     nxt_unit_default_init(task, &ruby_unit_init);
261 
262     ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler;
263     ruby_unit_init.callbacks.ready_handler = nxt_ruby_ready_handler;
264     ruby_unit_init.shm_limit = conf->shm_limit;
265     ruby_unit_init.data = c;
266     ruby_unit_init.ctx_data = &ruby_ctx;
267 
268     unit_ctx = nxt_unit_init(&ruby_unit_init);
269     if (nxt_slow_path(unit_ctx == NULL)) {
270         goto fail;
271     }
272 
273     rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_unit_run, unit_ctx,
274                                                nxt_ruby_ubf, unit_ctx);
275 
276     nxt_ruby_join_threads(unit_ctx, c);
277 
278     nxt_unit_done(unit_ctx);
279 
280     nxt_ruby_ctx_done(&ruby_ctx);
281 
282     nxt_ruby_atexit();
283 
284     exit(rc);
285 
286     return NXT_OK;
287 
288 fail:
289 
290     nxt_ruby_join_threads(NULL, c);
291 
292     nxt_ruby_ctx_done(&ruby_ctx);
293 
294     nxt_ruby_atexit();
295 
296     return NXT_ERROR;
297 }
298 
299 
300 static VALUE
301 nxt_ruby_init_basic(VALUE arg)
302 {
303     int                   state;
304     nxt_ruby_rack_init_t  *rack_init;
305 
306     rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) arg;
307 
308     state = rb_enc_find_index("encdb");
309     if (nxt_slow_path(state == 0)) {
310         nxt_alert(rack_init->task,
311                   "Ruby: Failed to find encoding index 'encdb'");
312 
313         return Qnil;
314     }
315 
316     rb_funcall(rb_cObject, rb_intern("require"), 1,
317                rb_str_new2("enc/trans/transdb"));
318 
319     return arg;
320 }
321 
322 
323 static VALUE
324 nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init)
325 {
326     int    state;
327     VALUE  rackup, err;
328 
329     rb_protect(nxt_ruby_require_rubygems, Qnil, &state);
330     if (nxt_slow_path(state != 0)) {
331         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
332                                "Failed to require 'rubygems' package");
333         return Qnil;
334     }
335 
336     rb_protect(nxt_ruby_bundler_setup, Qnil, &state);
337     if (state != 0) {
338         err = rb_errinfo();
339 
340         if (rb_obj_is_kind_of(err, rb_eLoadError) == Qfalse) {
341             nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
342                                    "Failed to require 'bundler/setup' package");
343             return Qnil;
344         }
345 
346         rb_set_errinfo(Qnil);
347     }
348 
349     rb_protect(nxt_ruby_require_rack, Qnil, &state);
350     if (nxt_slow_path(state != 0)) {
351         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
352                                "Failed to require 'rack' package");
353         return Qnil;
354     }
355 
356     rackup = rb_protect(nxt_ruby_rack_parse_script,
357                         (VALUE) (uintptr_t) rack_init, &state);
358     if (nxt_slow_path(TYPE(rackup) != T_ARRAY || state != 0)) {
359         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
360                                "Failed to parse rack script");
361         return Qnil;
362     }
363 
364     if (nxt_slow_path(RARRAY_LEN(rackup) < 1)) {
365         nxt_alert(rack_init->task, "Ruby: Invalid rack config file");
366         return Qnil;
367     }
368 
369     return RARRAY_PTR(rackup)[0];
370 }
371 
372 
373 static VALUE
374 nxt_ruby_require_rubygems(VALUE arg)
375 {
376     return rb_funcall(rb_cObject, rb_intern("require"), 1,
377                       rb_str_new2("rubygems"));
378 }
379 
380 
381 static VALUE
382 nxt_ruby_bundler_setup(VALUE arg)
383 {
384     return rb_funcall(rb_cObject, rb_intern("require"), 1,
385                       rb_str_new2("bundler/setup"));
386 }
387 
388 
389 static VALUE
390 nxt_ruby_require_rack(VALUE arg)
391 {
392     return rb_funcall(rb_cObject, rb_intern("require"), 1, rb_str_new2("rack"));
393 }
394 
395 
396 static VALUE
397 nxt_ruby_rack_parse_script(VALUE ctx)
398 {
399     VALUE                 script, res, rack, builder;
400     nxt_ruby_rack_init_t  *rack_init;
401 
402     rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) ctx;
403 
404     rack = rb_const_get(rb_cObject, rb_intern("Rack"));
405     builder = rb_const_get(rack, rb_intern("Builder"));
406 
407     script = rb_str_new((const char *) rack_init->script->start,
408                         (long) rack_init->script->length);
409 
410     res = rb_funcall(builder, rb_intern("parse_file"), 1, script);
411 
412     rb_str_free(script);
413 
414     return res;
415 }
416 
417 
418 static VALUE
419 nxt_ruby_rack_env_create(VALUE arg)
420 {
421     int             rc;
422     VALUE           hash_env, version;
423     nxt_ruby_ctx_t  *rctx;
424 
425     rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
426 
427     rc = nxt_ruby_init_io(rctx);
428     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
429         return Qnil;
430     }
431 
432     hash_env = rb_hash_new();
433 
434     rb_hash_aset(hash_env, rb_str_new2("SERVER_SOFTWARE"),
435                  rb_str_new((const char *) nxt_server.start,
436                             (long) nxt_server.length));
437 
438     version = rb_ary_new();
439 
440     rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MAJOR));
441     rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MINOR));
442 
443     rb_hash_aset(hash_env, rb_str_new2("rack.version"), version);
444     rb_hash_aset(hash_env, rb_str_new2("rack.input"), rctx->io_input);
445     rb_hash_aset(hash_env, rb_str_new2("rack.errors"), rctx->io_error);
446     rb_hash_aset(hash_env, rb_str_new2("rack.multithread"),
447                  nxt_ruby_threads > 1 ? Qtrue : Qfalse);
448     rb_hash_aset(hash_env, rb_str_new2("rack.multiprocess"), Qtrue);
449     rb_hash_aset(hash_env, rb_str_new2("rack.run_once"), Qfalse);
450     rb_hash_aset(hash_env, rb_str_new2("rack.hijack?"), Qfalse);
451     rb_hash_aset(hash_env, rb_str_new2("rack.hijack"), Qnil);
452     rb_hash_aset(hash_env, rb_str_new2("rack.hijack_io"), Qnil);
453 
454     rctx->env = hash_env;
455 
456     rb_gc_register_address(&rctx->env);
457 
458     return hash_env;
459 }
460 
461 
462 static int
463 nxt_ruby_init_io(nxt_ruby_ctx_t *rctx)
464 {
465     VALUE  io_input, io_error;
466 
467     io_input = nxt_ruby_stream_io_input_init();
468 
469     rctx->io_input = rb_funcall(io_input, rb_intern("new"), 1,
470                                    (VALUE) (uintptr_t) rctx);
471     if (nxt_slow_path(rctx->io_input == Qnil)) {
472         nxt_unit_alert(NULL,
473                        "Ruby: Failed to create environment 'rack.input' var");
474 
475         return NXT_UNIT_ERROR;
476     }
477 
478     rb_gc_register_address(&rctx->io_input);
479 
480     io_error = nxt_ruby_stream_io_error_init();
481 
482     rctx->io_error = rb_funcall(io_error, rb_intern("new"), 1,
483                                    (VALUE) (uintptr_t) rctx);
484     if (nxt_slow_path(rctx->io_error == Qnil)) {
485         nxt_unit_alert(NULL,
486                        "Ruby: Failed to create environment 'rack.error' var");
487 
488         return NXT_UNIT_ERROR;
489     }
490 
491     rb_gc_register_address(&rctx->io_error);
492 
493     return NXT_UNIT_OK;
494 }
495 
496 
497 static void
498 nxt_ruby_request_handler(nxt_unit_request_info_t *req)
499 {
500     (void) rb_thread_call_with_gvl(nxt_ruby_request_handler_gvl, req);
501 }
502 
503 
504 static void *
505 nxt_ruby_request_handler_gvl(void *data)
506 {
507     int                      state;
508     VALUE                    res;
509     nxt_ruby_ctx_t           *rctx;
510     nxt_unit_request_info_t  *req;
511 
512     req = data;
513 
514     rctx = req->ctx->data;
515     rctx->req = req;
516 
517     res = rb_protect(nxt_ruby_rack_app_run, (VALUE) (uintptr_t) req, &state);
518     if (nxt_slow_path(res == Qnil || state != 0)) {
519         nxt_ruby_exception_log(req, NXT_LOG_ERR,
520                                "Failed to run ruby script");
521 
522         nxt_unit_request_done(req, NXT_UNIT_ERROR);
523 
524     } else {
525         nxt_unit_request_done(req, NXT_UNIT_OK);
526     }
527 
528     rctx->req = NULL;
529 
530     return NULL;
531 }
532 
533 
534 static VALUE
535 nxt_ruby_rack_app_run(VALUE arg)
536 {
537     int                      rc;
538     VALUE                    env, result;
539     nxt_int_t                status;
540     nxt_ruby_ctx_t           *rctx;
541     nxt_unit_request_info_t  *req;
542 
543     req = (nxt_unit_request_info_t *) arg;
544 
545     rctx = req->ctx->data;
546 
547     env = rb_hash_dup(rctx->env);
548 
549     rc = nxt_ruby_read_request(req, env);
550     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
551         nxt_unit_req_alert(req,
552                            "Ruby: Failed to process incoming request");
553 
554         goto fail;
555     }
556 
557     result = rb_funcall(nxt_ruby_rackup, nxt_ruby_call, 1, env);
558     if (nxt_slow_path(TYPE(result) != T_ARRAY)) {
559         nxt_unit_req_error(req,
560                            "Ruby: Invalid response format from application");
561 
562         goto fail;
563     }
564 
565     if (nxt_slow_path(RARRAY_LEN(result) != 3)) {
566         nxt_unit_req_error(req,
567                            "Ruby: Invalid response format from application. "
568                            "Need 3 entries [Status, Headers, Body]");
569 
570         goto fail;
571     }
572 
573     status = nxt_ruby_rack_result_status(req, result);
574     if (nxt_slow_path(status < 0)) {
575         nxt_unit_req_error(req,
576                            "Ruby: Invalid response status from application.");
577 
578         goto fail;
579     }
580 
581     rc = nxt_ruby_rack_result_headers(req, result, status);
582     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
583         goto fail;
584     }
585 
586     rc = nxt_ruby_rack_result_body(req, result);
587     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
588         goto fail;
589     }
590 
591     rb_hash_delete(env, rb_obj_id(env));
592 
593     return result;
594 
595 fail:
596 
597     rb_hash_delete(env, rb_obj_id(env));
598 
599     return Qnil;
600 }
601 
602 
603 static int
604 nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env)
605 {
606     VALUE               name;
607     uint32_t            i;
608     nxt_unit_field_t    *f;
609     nxt_unit_request_t  *r;
610 
611     r = req->request;
612 
613     nxt_ruby_add_sptr(hash_env, nxt_rb_request_method_str, &r->method,
614                       r->method_length);
615     nxt_ruby_add_sptr(hash_env, nxt_rb_request_uri_str, &r->target,
616                       r->target_length);
617     nxt_ruby_add_sptr(hash_env, nxt_rb_path_info_str, &r->path, r->path_length);
618     nxt_ruby_add_sptr(hash_env, nxt_rb_query_string_str, &r->query,
619                       r->query_length);
620     nxt_ruby_add_sptr(hash_env, nxt_rb_server_protocol_str, &r->version,
621                       r->version_length);
622     nxt_ruby_add_sptr(hash_env, nxt_rb_remote_addr_str, &r->remote,
623                       r->remote_length);
624     nxt_ruby_add_sptr(hash_env, nxt_rb_server_addr_str, &r->local,
625                       r->local_length);
626     nxt_ruby_add_sptr(hash_env, nxt_rb_server_name_str, &r->server_name,
627                       r->server_name_length);
628 
629     rb_hash_aset(hash_env, nxt_rb_server_port_str, nxt_rb_80_str);
630 
631     rb_hash_aset(hash_env, nxt_rb_rack_url_scheme_str,
632                  r->tls ? nxt_rb_https_str : nxt_rb_http_str);
633 
634     for (i = 0; i < r->fields_count; i++) {
635         f = r->fields + i;
636 
637         name = rb_str_new(nxt_unit_sptr_get(&f->name), f->name_length);
638 
639         nxt_ruby_add_sptr(hash_env, name, &f->value, f->value_length);
640     }
641 
642     if (r->content_length_field != NXT_UNIT_NONE_FIELD) {
643         f = r->fields + r->content_length_field;
644 
645         nxt_ruby_add_sptr(hash_env, nxt_rb_content_length_str,
646                           &f->value, f->value_length);
647     }
648 
649     if (r->content_type_field != NXT_UNIT_NONE_FIELD) {
650         f = r->fields + r->content_type_field;
651 
652         nxt_ruby_add_sptr(hash_env, nxt_rb_content_type_str,
653                           &f->value, f->value_length);
654     }
655 
656     return NXT_UNIT_OK;
657 }
658 
659 
660 nxt_inline void
661 nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
662     nxt_unit_sptr_t *sptr, uint32_t len)
663 {
664     char  *str;
665 
666     str = nxt_unit_sptr_get(sptr);
667 
668     rb_hash_aset(hash_env, name, rb_str_new(str, len));
669 }
670 
671 
672 static nxt_int_t
673 nxt_ruby_rack_result_status(nxt_unit_request_info_t *req, VALUE result)
674 {
675     VALUE   status;
676 
677     status = rb_ary_entry(result, 0);
678 
679     if (TYPE(status) == T_FIXNUM) {
680         return FIX2INT(status);
681     }
682 
683     if (TYPE(status) == T_STRING) {
684         return nxt_int_parse((u_char *) RSTRING_PTR(status),
685                              RSTRING_LEN(status));
686     }
687 
688     nxt_unit_req_error(req, "Ruby: Invalid response 'status' "
689                        "format from application");
690 
691     return -2;
692 }
693 
694 
695 typedef struct {
696     int                      rc;
697     uint32_t                 fields;
698     uint32_t                 size;
699     nxt_unit_request_info_t  *req;
700 } nxt_ruby_headers_info_t;
701 
702 
703 static int
704 nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req, VALUE result,
705     nxt_int_t status)
706 {
707     int                      rc;
708     VALUE                    headers;
709     nxt_ruby_headers_info_t  headers_info;
710 
711     headers = rb_ary_entry(result, 1);
712     if (nxt_slow_path(TYPE(headers) != T_HASH)) {
713         nxt_unit_req_error(req,
714                            "Ruby: Invalid response 'headers' format from "
715                            "application");
716 
717         return NXT_UNIT_ERROR;
718     }
719 
720     rc = NXT_UNIT_OK;
721 
722     headers_info.rc = NXT_UNIT_OK;
723     headers_info.fields = 0;
724     headers_info.size = 0;
725     headers_info.req = req;
726 
727     rb_hash_foreach(headers, nxt_ruby_hash_info,
728                     (VALUE) (uintptr_t) &headers_info);
729     if (nxt_slow_path(headers_info.rc != NXT_UNIT_OK)) {
730         return headers_info.rc;
731     }
732 
733     rc = nxt_unit_response_init(req, status,
734                                 headers_info.fields, headers_info.size);
735     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
736         return rc;
737     }
738 
739     rb_hash_foreach(headers, nxt_ruby_hash_add,
740                     (VALUE) (uintptr_t) &headers_info);
741 
742     return rc;
743 }
744 
745 
746 static int
747 nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg)
748 {
749     const char               *value, *value_end, *pos;
750     nxt_ruby_headers_info_t  *headers_info;
751 
752     headers_info = (void *) (uintptr_t) arg;
753 
754     if (nxt_slow_path(TYPE(r_key) != T_STRING)) {
755         nxt_unit_req_error(headers_info->req,
756                            "Ruby: Wrong header entry 'key' from application");
757 
758         goto fail;
759     }
760 
761     if (nxt_slow_path(TYPE(r_value) != T_STRING)) {
762         nxt_unit_req_error(headers_info->req,
763                            "Ruby: Wrong header entry 'value' from application");
764 
765         goto fail;
766     }
767 
768     value = RSTRING_PTR(r_value);
769     value_end = value + RSTRING_LEN(r_value);
770 
771     pos = value;
772 
773     for ( ;; ) {
774         pos = strchr(pos, '\n');
775 
776         if (pos == NULL) {
777             break;
778         }
779 
780         headers_info->fields++;
781         headers_info->size += RSTRING_LEN(r_key) + (pos - value);
782 
783         pos++;
784         value = pos;
785     }
786 
787     if (value <= value_end) {
788         headers_info->fields++;
789         headers_info->size += RSTRING_LEN(r_key) + (value_end - value);
790     }
791 
792     return ST_CONTINUE;
793 
794 fail:
795 
796     headers_info->rc = NXT_UNIT_ERROR;
797 
798     return ST_STOP;
799 }
800 
801 
802 static int
803 nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg)
804 {
805     int                      *rc;
806     uint32_t                 key_len;
807     const char               *value, *value_end, *pos;
808     nxt_ruby_headers_info_t  *headers_info;
809 
810     headers_info = (void *) (uintptr_t) arg;
811     rc = &headers_info->rc;
812 
813     value = RSTRING_PTR(r_value);
814     value_end = value + RSTRING_LEN(r_value);
815 
816     key_len = RSTRING_LEN(r_key);
817 
818     pos = value;
819 
820     for ( ;; ) {
821         pos = strchr(pos, '\n');
822 
823         if (pos == NULL) {
824             break;
825         }
826 
827         *rc = nxt_unit_response_add_field(headers_info->req,
828                                           RSTRING_PTR(r_key), key_len,
829                                           value, pos - value);
830         if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
831             goto fail;
832         }
833 
834         pos++;
835         value = pos;
836     }
837 
838     if (value <= value_end) {
839         *rc = nxt_unit_response_add_field(headers_info->req,
840                                           RSTRING_PTR(r_key), key_len,
841                                           value, value_end - value);
842         if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
843             goto fail;
844         }
845     }
846 
847     return ST_CONTINUE;
848 
849 fail:
850 
851     *rc = NXT_UNIT_ERROR;
852 
853     return ST_STOP;
854 }
855 
856 
857 static int
858 nxt_ruby_rack_result_body(nxt_unit_request_info_t *req, VALUE result)
859 {
860     int    rc;
861     VALUE  fn, body;
862 
863     body = rb_ary_entry(result, 2);
864 
865     if (rb_respond_to(body, rb_intern("to_path"))) {
866 
867         fn = rb_funcall(body, rb_intern("to_path"), 0);
868         if (nxt_slow_path(TYPE(fn) != T_STRING)) {
869             nxt_unit_req_error(req,
870                                "Ruby: Failed to get 'body' file path from "
871                                "application");
872 
873             return NXT_UNIT_ERROR;
874         }
875 
876         rc = nxt_ruby_rack_result_body_file_write(req, fn);
877         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
878             return rc;
879         }
880 
881     } else if (rb_respond_to(body, rb_intern("each"))) {
882         rb_block_call(body, rb_intern("each"), 0, 0,
883                       nxt_ruby_rack_result_body_each, (VALUE) (uintptr_t) req);
884 
885     } else {
886         nxt_unit_req_error(req,
887                            "Ruby: Invalid response 'body' format "
888                            "from application");
889 
890         return NXT_UNIT_ERROR;
891     }
892 
893     if (rb_respond_to(body, rb_intern("close"))) {
894         rb_funcall(body, rb_intern("close"), 0);
895     }
896 
897     return NXT_UNIT_OK;
898 }
899 
900 
901 typedef struct {
902     int    fd;
903     off_t  pos;
904     off_t  rest;
905 } nxt_ruby_rack_file_t;
906 
907 
908 static ssize_t
909 nxt_ruby_rack_file_read(nxt_unit_read_info_t *read_info, void *dst, size_t size)
910 {
911     ssize_t               res;
912     nxt_ruby_rack_file_t  *file;
913 
914     file = read_info->data;
915 
916     size = nxt_min(size, (size_t) file->rest);
917 
918     res = pread(file->fd, dst, size, file->pos);
919 
920     if (res >= 0) {
921         file->pos += res;
922         file->rest -= res;
923 
924         if (size > (size_t) res) {
925             file->rest = 0;
926         }
927     }
928 
929     read_info->eof = file->rest == 0;
930 
931     return res;
932 }
933 
934 
935 typedef struct {
936     nxt_unit_read_info_t     read_info;
937     nxt_unit_request_info_t  *req;
938 } nxt_ruby_read_info_t;
939 
940 
941 static int
942 nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
943     VALUE filepath)
944 {
945     int                   fd, rc;
946     struct stat           finfo;
947     nxt_ruby_rack_file_t  ruby_file;
948     nxt_ruby_read_info_t  ri;
949 
950     fd = open(RSTRING_PTR(filepath), O_RDONLY, 0);
951     if (nxt_slow_path(fd == -1)) {
952         nxt_unit_req_error(req,
953                            "Ruby: Failed to open content file \"%s\": %s (%d)",
954                            RSTRING_PTR(filepath), strerror(errno), errno);
955 
956         return NXT_UNIT_ERROR;
957     }
958 
959     rc = fstat(fd, &finfo);
960     if (nxt_slow_path(rc == -1)) {
961         nxt_unit_req_error(req,
962                            "Ruby: Content file fstat(\"%s\") failed: %s (%d)",
963                            RSTRING_PTR(filepath), strerror(errno), errno);
964 
965         close(fd);
966 
967         return NXT_UNIT_ERROR;
968     }
969 
970     ruby_file.fd = fd;
971     ruby_file.pos = 0;
972     ruby_file.rest = finfo.st_size;
973 
974     ri.read_info.read = nxt_ruby_rack_file_read;
975     ri.read_info.eof = ruby_file.rest == 0;
976     ri.read_info.buf_size = ruby_file.rest;
977     ri.read_info.data = &ruby_file;
978     ri.req = req;
979 
980     rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_response_write_cb,
981                                                &ri,
982                                                nxt_ruby_ubf,
983                                                req->ctx);
984 
985     close(fd);
986 
987     return rc;
988 }
989 
990 
991 static void *
992 nxt_ruby_response_write_cb(void *data)
993 {
994     int                   rc;
995     nxt_ruby_read_info_t  *ri;
996 
997     ri = data;
998 
999     rc = nxt_unit_response_write_cb(ri->req, &ri->read_info);
1000     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1001         nxt_unit_req_error(ri->req, "Ruby: Failed to write content file.");
1002     }
1003 
1004     return (void *) (intptr_t) rc;
1005 }
1006 
1007 
1008 typedef struct {
1009     VALUE                    body;
1010     nxt_unit_request_info_t  *req;
1011 } nxt_ruby_write_info_t;
1012 
1013 
1014 static VALUE
1015 nxt_ruby_rack_result_body_each(VALUE body, VALUE arg, int argc,
1016     const VALUE *argv, VALUE blockarg)
1017 {
1018     nxt_ruby_write_info_t  wi;
1019 
1020     if (TYPE(body) != T_STRING) {
1021         return Qnil;
1022     }
1023 
1024     wi.body = body;
1025     wi.req = (void *) (uintptr_t) arg;
1026 
1027     (void) rb_thread_call_without_gvl(nxt_ruby_response_write,
1028                                       (void *) (uintptr_t) &wi,
1029                                       nxt_ruby_ubf, wi.req->ctx);
1030 
1031     return Qnil;
1032 }
1033 
1034 
1035 static void *
1036 nxt_ruby_response_write(void *data)
1037 {
1038     int                    rc;
1039     nxt_ruby_write_info_t  *wi;
1040 
1041     wi = data;
1042 
1043     rc = nxt_unit_response_write(wi->req, RSTRING_PTR(wi->body),
1044                                  RSTRING_LEN(wi->body));
1045     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1046         nxt_unit_req_error(wi->req,
1047                            "Ruby: Failed to write 'body' from application");
1048     }
1049 
1050     return (void *) (intptr_t) rc;
1051 }
1052 
1053 
1054 static void
1055 nxt_ruby_exception_log(nxt_unit_request_info_t *req, uint32_t level,
1056     const char *desc)
1057 {
1058     int    i;
1059     VALUE  err, ary, eclass, msg;
1060 
1061     nxt_unit_req_log(req, level, "Ruby: %s", desc);
1062 
1063     err = rb_errinfo();
1064     if (nxt_slow_path(err == Qnil)) {
1065         return;
1066     }
1067 
1068     ary = rb_funcall(err, rb_intern("backtrace"), 0);
1069     if (nxt_slow_path(RARRAY_LEN(ary) == 0)) {
1070         return;
1071     }
1072 
1073     eclass = rb_class_name(rb_class_of(err));
1074     msg = rb_funcall(err, rb_intern("message"), 0);
1075 
1076     nxt_unit_req_log(req, level, "Ruby: %s: %s (%s)",
1077                      RSTRING_PTR(RARRAY_PTR(ary)[0]),
1078                      RSTRING_PTR(msg), RSTRING_PTR(eclass));
1079 
1080     for (i = 1; i < RARRAY_LEN(ary); i++) {
1081         nxt_unit_req_log(req, level, "from %s",
1082                          RSTRING_PTR(RARRAY_PTR(ary)[i]));
1083     }
1084 }
1085 
1086 
1087 static void
1088 nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx)
1089 {
1090     if (rctx->io_input != Qnil) {
1091         rb_gc_unregister_address(&rctx->io_input);
1092     }
1093 
1094     if (rctx->io_error != Qnil) {
1095         rb_gc_unregister_address(&rctx->io_error);
1096     }
1097 
1098     if (rctx->env != Qnil) {
1099         rb_gc_unregister_address(&rctx->env);
1100     }
1101 }
1102 
1103 
1104 static void
1105 nxt_ruby_atexit(void)
1106 {
1107     if (nxt_ruby_rackup != Qnil) {
1108         rb_gc_unregister_address(&nxt_ruby_rackup);
1109     }
1110 
1111     if (nxt_ruby_call != Qnil) {
1112         rb_gc_unregister_address(&nxt_ruby_call);
1113     }
1114 
1115     nxt_ruby_done_strings();
1116 
1117     ruby_cleanup(0);
1118 }
1119 
1120 
1121 static int
1122 nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx)
1123 {
1124     VALUE                res;
1125     uint32_t             i;
1126     nxt_ruby_ctx_t       *rctx;
1127     nxt_ruby_app_conf_t  *c;
1128 
1129     /* Worker thread context. */
1130     if (!nxt_unit_is_main_ctx(ctx)) {
1131         return NXT_UNIT_OK;
1132     }
1133 
1134     c = ctx->unit->data;
1135 
1136     if (c->threads <= 1) {
1137         return NXT_UNIT_OK;
1138     }
1139 
1140     for (i = 0; i < c->threads - 1; i++) {
1141         rctx = &nxt_ruby_ctxs[i];
1142 
1143         rctx->ctx = ctx;
1144 
1145         res = (VALUE) rb_thread_call_with_gvl(nxt_ruby_thread_create_gvl, rctx);
1146 
1147         if (nxt_fast_path(res != Qnil)) {
1148             nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1));
1149 
1150             rctx->thread = res;
1151 
1152         } else {
1153             nxt_unit_alert(ctx, "thread #%d create failed", (int) (i + 1));
1154 
1155             return NXT_UNIT_ERROR;
1156         }
1157     }
1158 
1159     return NXT_UNIT_OK;
1160 }
1161 
1162 
1163 static void *
1164 nxt_ruby_thread_create_gvl(void *rctx)
1165 {
1166     VALUE  res;
1167 
1168     res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx);
1169 
1170     return (void *) (uintptr_t) res;
1171 }
1172 
1173 
1174 static VALUE
1175 nxt_ruby_thread_func(VALUE arg)
1176 {
1177     nxt_unit_ctx_t  *ctx;
1178     nxt_ruby_ctx_t  *rctx;
1179 
1180     rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
1181 
1182     nxt_unit_debug(rctx->ctx, "worker thread start");
1183 
1184     ctx = nxt_unit_ctx_alloc(rctx->ctx, rctx);
1185     if (nxt_slow_path(ctx == NULL)) {
1186         goto fail;
1187     }
1188 
1189     (void) rb_thread_call_without_gvl(nxt_ruby_unit_run, ctx,
1190                                       nxt_ruby_ubf, ctx);
1191 
1192     nxt_unit_done(ctx);
1193 
1194 fail:
1195 
1196     nxt_unit_debug(NULL, "worker thread end");
1197 
1198     return Qnil;
1199 }
1200 
1201 
1202 static void *
1203 nxt_ruby_unit_run(void *ctx)
1204 {
1205     return (void *) (intptr_t) nxt_unit_run(ctx);
1206 }
1207 
1208 
1209 static void
1210 nxt_ruby_ubf(void *ctx)
1211 {
1212     nxt_unit_warn(ctx, "Ruby: UBF");
1213 }
1214 
1215 
1216 static int
1217 nxt_ruby_init_threads(nxt_ruby_app_conf_t *c)
1218 {
1219     int             state;
1220     uint32_t        i;
1221     nxt_ruby_ctx_t  *rctx;
1222 
1223     if (c->threads <= 1) {
1224         return NXT_UNIT_OK;
1225     }
1226 
1227     nxt_ruby_ctxs = nxt_unit_malloc(NULL, sizeof(nxt_ruby_ctx_t)
1228                                           * (c->threads - 1));
1229     if (nxt_slow_path(nxt_ruby_ctxs == NULL)) {
1230         nxt_unit_alert(NULL, "Failed to allocate run contexts array");
1231 
1232         return NXT_UNIT_ERROR;
1233     }
1234 
1235     for (i = 0; i < c->threads - 1; i++) {
1236         rctx = &nxt_ruby_ctxs[i];
1237 
1238         rctx->env = Qnil;
1239         rctx->io_input = Qnil;
1240         rctx->io_error = Qnil;
1241         rctx->thread = Qnil;
1242     }
1243 
1244     for (i = 0; i < c->threads - 1; i++) {
1245         rctx = &nxt_ruby_ctxs[i];
1246 
1247         rctx->env = rb_protect(nxt_ruby_rack_env_create,
1248                                (VALUE) (uintptr_t) rctx, &state);
1249         if (nxt_slow_path(rctx->env == Qnil || state != 0)) {
1250             nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
1251                                    "Failed to create 'environ' variable");
1252             return NXT_UNIT_ERROR;
1253         }
1254     }
1255 
1256     return NXT_UNIT_OK;
1257 }
1258 
1259 
1260 static void
1261 nxt_ruby_join_threads(nxt_unit_ctx_t *ctx, nxt_ruby_app_conf_t *c)
1262 {
1263     uint32_t        i;
1264     nxt_ruby_ctx_t  *rctx;
1265 
1266     if (nxt_ruby_ctxs == NULL) {
1267         return;
1268     }
1269 
1270     for (i = 0; i < c->threads - 1; i++) {
1271         rctx = &nxt_ruby_ctxs[i];
1272 
1273         if (rctx->thread != Qnil) {
1274             rb_funcall(rctx->thread, rb_intern("join"), 0);
1275 
1276             nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1));
1277 
1278         } else {
1279             nxt_unit_debug(ctx, "thread #%d not started", (int) (i + 1));
1280         }
1281     }
1282 
1283     for (i = 0; i < c->threads - 1; i++) {
1284         nxt_ruby_ctx_done(&nxt_ruby_ctxs[i]);
1285     }
1286 
1287     nxt_unit_free(ctx, nxt_ruby_ctxs);
1288 }
1289