xref: /unit/src/ruby/nxt_ruby.c (revision 2609:f2a0806125dc)
1 /*
2  * Copyright (C) Alexander Borisov
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <ruby/nxt_ruby.h>
7 
8 #include <nxt_unit.h>
9 #include <nxt_unit_request.h>
10 
11 #include <ruby/thread.h>
12 
13 #include NXT_RUBY_MOUNTS_H
14 
15 #include <locale.h>
16 
17 
18 #define NXT_RUBY_RACK_API_VERSION_MAJOR  1
19 #define NXT_RUBY_RACK_API_VERSION_MINOR  3
20 
21 
22 typedef struct {
23     nxt_task_t      *task;
24     nxt_str_t       *script;
25     nxt_ruby_ctx_t  *rctx;
26 } nxt_ruby_rack_init_t;
27 
28 
29 static nxt_int_t nxt_ruby_start(nxt_task_t *task,
30     nxt_process_data_t *data);
31 static VALUE nxt_ruby_init_basic(VALUE arg);
32 
33 static VALUE nxt_ruby_hook_procs_load(VALUE path);
34 static VALUE nxt_ruby_hook_register(VALUE arg);
35 static VALUE nxt_ruby_hook_call(VALUE name);
36 
37 static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init);
38 
39 static VALUE nxt_ruby_require_rubygems(VALUE arg);
40 static VALUE nxt_ruby_bundler_setup(VALUE arg);
41 static VALUE nxt_ruby_require_rack(VALUE arg);
42 static VALUE nxt_ruby_rack_parse_script(VALUE ctx);
43 static VALUE nxt_ruby_rack_env_create(VALUE arg);
44 static int nxt_ruby_init_io(nxt_ruby_ctx_t *rctx);
45 static void nxt_ruby_request_handler(nxt_unit_request_info_t *req);
46 static void *nxt_ruby_request_handler_gvl(void *req);
47 static int nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx);
48 static void *nxt_ruby_thread_create_gvl(void *rctx);
49 static VALUE nxt_ruby_thread_func(VALUE arg);
50 static void *nxt_ruby_unit_run(void *ctx);
51 static void nxt_ruby_ubf(void *ctx);
52 static int nxt_ruby_init_threads(nxt_ruby_app_conf_t *c);
53 static void nxt_ruby_join_threads(nxt_unit_ctx_t *ctx,
54     nxt_ruby_app_conf_t *c);
55 
56 static VALUE nxt_ruby_rack_app_run(VALUE arg);
57 static int nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env);
58 nxt_inline void nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
59     nxt_unit_sptr_t *sptr, uint32_t len);
60 static nxt_int_t nxt_ruby_rack_result_status(nxt_unit_request_info_t *req,
61     VALUE result);
62 static int nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req,
63     VALUE result, nxt_int_t status);
64 static int nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg);
65 static int nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg);
66 static int nxt_ruby_rack_result_body(nxt_unit_request_info_t *req,
67     VALUE result);
68 static int nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
69     VALUE filepath);
70 static void *nxt_ruby_response_write_cb(void *read_info);
71 static VALUE nxt_ruby_rack_result_body_each(VALUE body, VALUE arg,
72     int argc, const VALUE *argv, VALUE blockarg);
73 static void *nxt_ruby_response_write(void *body);
74 
75 static void nxt_ruby_exception_log(nxt_unit_request_info_t *req,
76     uint32_t level, const char *desc);
77 
78 static void nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx);
79 static void nxt_ruby_atexit(void);
80 
81 
82 static uint32_t  compat[] = {
83     NXT_VERNUM, NXT_DEBUG,
84 };
85 
86 static VALUE  nxt_ruby_hook_procs;
87 static VALUE  nxt_ruby_rackup;
88 static VALUE  nxt_ruby_call;
89 
90 static uint32_t        nxt_ruby_threads;
91 static nxt_ruby_ctx_t  *nxt_ruby_ctxs;
92 
93 NXT_EXPORT nxt_app_module_t  nxt_app_module = {
94     sizeof(compat),
95     compat,
96     nxt_string("ruby"),
97     ruby_version,
98     nxt_ruby_mounts,
99     nxt_nitems(nxt_ruby_mounts),
100     NULL,
101     nxt_ruby_start,
102 };
103 
104 typedef struct {
105     nxt_str_t  string;
106     VALUE      *v;
107 } nxt_ruby_string_t;
108 
109 static VALUE  nxt_rb_80_str;
110 static VALUE  nxt_rb_content_length_str;
111 static VALUE  nxt_rb_content_type_str;
112 static VALUE  nxt_rb_http_str;
113 static VALUE  nxt_rb_https_str;
114 static VALUE  nxt_rb_path_info_str;
115 static VALUE  nxt_rb_query_string_str;
116 static VALUE  nxt_rb_rack_url_scheme_str;
117 static VALUE  nxt_rb_remote_addr_str;
118 static VALUE  nxt_rb_request_method_str;
119 static VALUE  nxt_rb_request_uri_str;
120 static VALUE  nxt_rb_server_addr_str;
121 static VALUE  nxt_rb_server_name_str;
122 static VALUE  nxt_rb_server_port_str;
123 static VALUE  nxt_rb_server_protocol_str;
124 static VALUE  nxt_rb_on_worker_boot;
125 static VALUE  nxt_rb_on_worker_shutdown;
126 static VALUE  nxt_rb_on_thread_boot;
127 static VALUE  nxt_rb_on_thread_shutdown;
128 
129 static nxt_ruby_string_t nxt_rb_strings[] = {
130     { nxt_string("80"), &nxt_rb_80_str },
131     { nxt_string("CONTENT_LENGTH"), &nxt_rb_content_length_str },
132     { nxt_string("CONTENT_TYPE"), &nxt_rb_content_type_str },
133     { nxt_string("http"), &nxt_rb_http_str },
134     { nxt_string("https"), &nxt_rb_https_str },
135     { nxt_string("PATH_INFO"), &nxt_rb_path_info_str },
136     { nxt_string("QUERY_STRING"), &nxt_rb_query_string_str },
137     { nxt_string("rack.url_scheme"), &nxt_rb_rack_url_scheme_str },
138     { nxt_string("REMOTE_ADDR"), &nxt_rb_remote_addr_str },
139     { nxt_string("REQUEST_METHOD"), &nxt_rb_request_method_str },
140     { nxt_string("REQUEST_URI"), &nxt_rb_request_uri_str },
141     { nxt_string("SERVER_ADDR"), &nxt_rb_server_addr_str },
142     { nxt_string("SERVER_NAME"), &nxt_rb_server_name_str },
143     { nxt_string("SERVER_PORT"), &nxt_rb_server_port_str },
144     { nxt_string("SERVER_PROTOCOL"), &nxt_rb_server_protocol_str },
145     { nxt_string("on_worker_boot"), &nxt_rb_on_worker_boot },
146     { nxt_string("on_worker_shutdown"), &nxt_rb_on_worker_shutdown },
147     { nxt_string("on_thread_boot"), &nxt_rb_on_thread_boot },
148     { nxt_string("on_thread_shutdown"), &nxt_rb_on_thread_shutdown },
149     { nxt_null_string, NULL },
150 };
151 
152 
153 static int
nxt_ruby_init_strings(void)154 nxt_ruby_init_strings(void)
155 {
156     VALUE              v;
157     nxt_ruby_string_t  *pstr;
158 
159     pstr = nxt_rb_strings;
160 
161     while (pstr->string.start != NULL) {
162         v = rb_str_new_static((char *) pstr->string.start, pstr->string.length);
163 
164         if (nxt_slow_path(v == Qnil)) {
165             nxt_unit_alert(NULL, "Ruby: failed to create const string '%.*s'",
166                            (int) pstr->string.length,
167                            (char *) pstr->string.start);
168 
169             return NXT_UNIT_ERROR;
170         }
171 
172         *pstr->v = v;
173 
174         rb_gc_register_address(pstr->v);
175 
176         pstr++;
177     }
178 
179     return NXT_UNIT_OK;
180 }
181 
182 
183 static void
nxt_ruby_done_strings(void)184 nxt_ruby_done_strings(void)
185 {
186     nxt_ruby_string_t  *pstr;
187 
188     pstr = nxt_rb_strings;
189 
190     while (pstr->string.start != NULL) {
191         rb_gc_unregister_address(pstr->v);
192 
193         *pstr->v = Qnil;
194 
195         pstr++;
196     }
197 }
198 
199 
200 static VALUE
nxt_ruby_hook_procs_load(VALUE path)201 nxt_ruby_hook_procs_load(VALUE path)
202 {
203     VALUE  module, file, file_obj;
204 
205     module = rb_define_module("Unit");
206 
207     nxt_ruby_hook_procs = rb_hash_new();
208 
209     rb_gc_register_address(&nxt_ruby_hook_procs);
210 
211     rb_define_module_function(module, "on_worker_boot",
212                               &nxt_ruby_hook_register, 0);
213     rb_define_module_function(module, "on_worker_shutdown",
214                               &nxt_ruby_hook_register, 0);
215     rb_define_module_function(module, "on_thread_boot",
216                               &nxt_ruby_hook_register, 0);
217     rb_define_module_function(module, "on_thread_shutdown",
218                               &nxt_ruby_hook_register, 0);
219 
220     file = rb_const_get(rb_cObject, rb_intern("File"));
221     file_obj = rb_funcall(file, rb_intern("read"), 1, path);
222 
223     return rb_funcall(module, rb_intern("module_eval"), 3, file_obj, path,
224                       INT2NUM(1));
225 }
226 
227 
228 static VALUE
nxt_ruby_hook_register(VALUE arg)229 nxt_ruby_hook_register(VALUE arg)
230 {
231     VALUE  kernel, callee, callee_str;
232 
233     rb_need_block();
234 
235     kernel = rb_const_get(rb_cObject, rb_intern("Kernel"));
236     callee = rb_funcall(kernel, rb_intern("__callee__"), 0);
237     callee_str = rb_funcall(callee, rb_intern("to_s"), 0);
238 
239     rb_hash_aset(nxt_ruby_hook_procs, callee_str, rb_block_proc());
240 
241     return Qnil;
242 }
243 
244 
245 static VALUE
nxt_ruby_hook_call(VALUE name)246 nxt_ruby_hook_call(VALUE name)
247 {
248     VALUE  proc;
249 
250     proc = rb_hash_lookup(nxt_ruby_hook_procs, name);
251     if (proc == Qnil) {
252         return Qnil;
253     }
254 
255     return rb_funcall(proc, rb_intern("call"), 0);
256 }
257 
258 
259 static nxt_int_t
nxt_ruby_start(nxt_task_t * task,nxt_process_data_t * data)260 nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data)
261 {
262     int                    state, rc;
263     VALUE                  res, path;
264     nxt_ruby_ctx_t         ruby_ctx;
265     nxt_unit_ctx_t         *unit_ctx;
266     nxt_unit_init_t        ruby_unit_init;
267     nxt_ruby_app_conf_t    *c;
268     nxt_ruby_rack_init_t   rack_init;
269     nxt_common_app_conf_t  *conf;
270 
271     static char  *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" };
272 
273     signal(SIGINT, SIG_IGN);
274 
275     conf = data->app;
276     c = &conf->u.ruby;
277 
278     nxt_ruby_threads = c->threads;
279 
280     setlocale(LC_CTYPE, "");
281 
282     RUBY_INIT_STACK
283     ruby_init();
284     ruby_options(2, argv);
285     ruby_script("NGINX_Unit");
286 
287     ruby_ctx.env = Qnil;
288     ruby_ctx.io_input = Qnil;
289     ruby_ctx.io_error = Qnil;
290     ruby_ctx.thread = Qnil;
291     ruby_ctx.ctx = NULL;
292     ruby_ctx.req = NULL;
293 
294     rack_init.task = task;
295     rack_init.script = &c->script;
296     rack_init.rctx = &ruby_ctx;
297 
298     nxt_ruby_init_strings();
299 
300     res = rb_protect(nxt_ruby_init_basic,
301                      (VALUE) (uintptr_t) &rack_init, &state);
302     if (nxt_slow_path(res == Qnil || state != 0)) {
303         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
304                                "Failed to init basic variables");
305         return NXT_ERROR;
306     }
307 
308     nxt_ruby_call = Qnil;
309     nxt_ruby_hook_procs = Qnil;
310 
311     if (c->hooks.start != NULL) {
312         path = rb_str_new((const char *) c->hooks.start,
313                           (long) c->hooks.length);
314 
315         rb_protect(nxt_ruby_hook_procs_load, path, &state);
316         rb_str_free(path);
317         if (nxt_slow_path(state != 0)) {
318             nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
319                                    "Failed to setup hooks");
320             return NXT_ERROR;
321         }
322     }
323 
324     if (nxt_ruby_hook_procs != Qnil) {
325         rb_protect(nxt_ruby_hook_call, nxt_rb_on_worker_boot, &state);
326         if (nxt_slow_path(state != 0)) {
327             nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
328                                    "Failed to call on_worker_boot()");
329             return NXT_ERROR;
330         }
331     }
332 
333     nxt_ruby_rackup = nxt_ruby_rack_init(&rack_init);
334     if (nxt_slow_path(nxt_ruby_rackup == Qnil)) {
335         return NXT_ERROR;
336     }
337 
338     rb_gc_register_address(&nxt_ruby_rackup);
339 
340     nxt_ruby_call = rb_intern("call");
341     if (nxt_slow_path(nxt_ruby_call == Qnil)) {
342         nxt_alert(task, "Ruby: Unable to find rack entry point");
343 
344         goto fail;
345     }
346 
347     rb_gc_register_address(&nxt_ruby_call);
348 
349     ruby_ctx.env = rb_protect(nxt_ruby_rack_env_create,
350                               (VALUE) (uintptr_t) &ruby_ctx, &state);
351     if (nxt_slow_path(ruby_ctx.env == Qnil || state != 0)) {
352         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
353                                "Failed to create 'environ' variable");
354         goto fail;
355     }
356 
357     rc = nxt_ruby_init_threads(c);
358     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
359         goto fail;
360     }
361 
362     nxt_unit_default_init(task, &ruby_unit_init, conf);
363 
364     ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler;
365     ruby_unit_init.callbacks.ready_handler = nxt_ruby_ready_handler;
366     ruby_unit_init.data = c;
367     ruby_unit_init.ctx_data = &ruby_ctx;
368 
369     unit_ctx = nxt_unit_init(&ruby_unit_init);
370     if (nxt_slow_path(unit_ctx == NULL)) {
371         goto fail;
372     }
373 
374     if (nxt_ruby_hook_procs != Qnil) {
375         rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_boot, &state);
376         if (nxt_slow_path(state != 0)) {
377             nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
378                                    "Failed to call on_thread_boot()");
379         }
380     }
381 
382     rc = (intptr_t) rb_thread_call_without_gvl2(nxt_ruby_unit_run, unit_ctx,
383                                                 nxt_ruby_ubf, unit_ctx);
384 
385     if (nxt_ruby_hook_procs != Qnil) {
386         rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_shutdown, &state);
387         if (nxt_slow_path(state != 0)) {
388             nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
389                                    "Failed to call on_thread_shutdown()");
390         }
391     }
392 
393     nxt_ruby_join_threads(unit_ctx, c);
394 
395     if (nxt_ruby_hook_procs != Qnil) {
396         rb_protect(nxt_ruby_hook_call, nxt_rb_on_worker_shutdown, &state);
397         if (nxt_slow_path(state != 0)) {
398             nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
399                                    "Failed to call on_worker_shutdown()");
400         }
401     }
402 
403     nxt_unit_done(unit_ctx);
404 
405     nxt_ruby_ctx_done(&ruby_ctx);
406 
407     nxt_ruby_atexit();
408 
409     exit(rc);
410 
411     return NXT_OK;
412 
413 fail:
414 
415     nxt_ruby_join_threads(NULL, c);
416 
417     nxt_ruby_ctx_done(&ruby_ctx);
418 
419     nxt_ruby_atexit();
420 
421     return NXT_ERROR;
422 }
423 
424 
425 static VALUE
nxt_ruby_init_basic(VALUE arg)426 nxt_ruby_init_basic(VALUE arg)
427 {
428     int                   state;
429     nxt_ruby_rack_init_t  *rack_init;
430 
431     rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) arg;
432 
433     state = rb_enc_find_index("encdb");
434     if (nxt_slow_path(state == 0)) {
435         nxt_alert(rack_init->task,
436                   "Ruby: Failed to find encoding index 'encdb'");
437 
438         return Qnil;
439     }
440 
441     rb_funcall(rb_cObject, rb_intern("require"), 1,
442                rb_str_new2("enc/trans/transdb"));
443 
444     return arg;
445 }
446 
447 
448 static VALUE
nxt_ruby_rack_init(nxt_ruby_rack_init_t * rack_init)449 nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init)
450 {
451     int    state;
452     VALUE  rackup, err;
453 
454     rb_protect(nxt_ruby_require_rubygems, Qnil, &state);
455     if (nxt_slow_path(state != 0)) {
456         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
457                                "Failed to require 'rubygems' package");
458         return Qnil;
459     }
460 
461     rb_protect(nxt_ruby_bundler_setup, Qnil, &state);
462     if (state != 0) {
463         err = rb_errinfo();
464 
465         if (rb_obj_is_kind_of(err, rb_eLoadError) == Qfalse) {
466             nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
467                                    "Failed to require 'bundler/setup' package");
468             return Qnil;
469         }
470 
471         rb_set_errinfo(Qnil);
472     }
473 
474     rb_protect(nxt_ruby_require_rack, Qnil, &state);
475     if (nxt_slow_path(state != 0)) {
476         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
477                                "Failed to require 'rack' package");
478         return Qnil;
479     }
480 
481     rackup = rb_protect(nxt_ruby_rack_parse_script,
482                         (VALUE) (uintptr_t) rack_init, &state);
483 
484     if (nxt_slow_path(state != 0)) {
485         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
486                                "Failed to parse rack script");
487         return Qnil;
488     }
489 
490     if (TYPE(rackup) != T_ARRAY) {
491         return rackup;
492     }
493 
494     if (nxt_slow_path(RARRAY_LEN(rackup) < 1)) {
495         nxt_ruby_exception_log(NULL, NXT_LOG_ALERT, "Invalid rack config file");
496         return Qnil;
497     }
498 
499     return RARRAY_PTR(rackup)[0];
500 }
501 
502 
503 static VALUE
nxt_ruby_require_rubygems(VALUE arg)504 nxt_ruby_require_rubygems(VALUE arg)
505 {
506     return rb_funcall(rb_cObject, rb_intern("require"), 1,
507                       rb_str_new2("rubygems"));
508 }
509 
510 
511 static VALUE
nxt_ruby_bundler_setup(VALUE arg)512 nxt_ruby_bundler_setup(VALUE arg)
513 {
514     return rb_funcall(rb_cObject, rb_intern("require"), 1,
515                       rb_str_new2("bundler/setup"));
516 }
517 
518 
519 static VALUE
nxt_ruby_require_rack(VALUE arg)520 nxt_ruby_require_rack(VALUE arg)
521 {
522     return rb_funcall(rb_cObject, rb_intern("require"), 1, rb_str_new2("rack"));
523 }
524 
525 
526 static VALUE
nxt_ruby_rack_parse_script(VALUE ctx)527 nxt_ruby_rack_parse_script(VALUE ctx)
528 {
529     VALUE                 script, res, rack, builder;
530     nxt_ruby_rack_init_t  *rack_init;
531 
532     rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) ctx;
533 
534     rack = rb_const_get(rb_cObject, rb_intern("Rack"));
535     builder = rb_const_get(rack, rb_intern("Builder"));
536 
537     script = rb_str_new((const char *) rack_init->script->start,
538                         (long) rack_init->script->length);
539 
540     res = rb_funcall(builder, rb_intern("parse_file"), 1, script);
541 
542     rb_str_free(script);
543 
544     return res;
545 }
546 
547 
548 static VALUE
nxt_ruby_rack_env_create(VALUE arg)549 nxt_ruby_rack_env_create(VALUE arg)
550 {
551     int             rc;
552     VALUE           hash_env, version;
553     nxt_ruby_ctx_t  *rctx;
554 
555     rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
556 
557     rc = nxt_ruby_init_io(rctx);
558     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
559         return Qnil;
560     }
561 
562     hash_env = rb_hash_new();
563 
564     rb_hash_aset(hash_env, rb_str_new2("SERVER_SOFTWARE"),
565                  rb_str_new((const char *) nxt_server.start,
566                             (long) nxt_server.length));
567 
568     version = rb_ary_new();
569 
570     rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MAJOR));
571     rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MINOR));
572 
573     rb_hash_aset(hash_env, rb_str_new2("SCRIPT_NAME"), rb_str_new("", 0));
574     rb_hash_aset(hash_env, rb_str_new2("rack.version"), version);
575     rb_hash_aset(hash_env, rb_str_new2("rack.input"), rctx->io_input);
576     rb_hash_aset(hash_env, rb_str_new2("rack.errors"), rctx->io_error);
577     rb_hash_aset(hash_env, rb_str_new2("rack.multithread"),
578                  nxt_ruby_threads > 1 ? Qtrue : Qfalse);
579     rb_hash_aset(hash_env, rb_str_new2("rack.multiprocess"), Qtrue);
580     rb_hash_aset(hash_env, rb_str_new2("rack.run_once"), Qfalse);
581     rb_hash_aset(hash_env, rb_str_new2("rack.hijack?"), Qfalse);
582     rb_hash_aset(hash_env, rb_str_new2("rack.hijack"), Qnil);
583     rb_hash_aset(hash_env, rb_str_new2("rack.hijack_io"), Qnil);
584 
585     rctx->env = hash_env;
586 
587     rb_gc_register_address(&rctx->env);
588 
589     return hash_env;
590 }
591 
592 
593 static int
nxt_ruby_init_io(nxt_ruby_ctx_t * rctx)594 nxt_ruby_init_io(nxt_ruby_ctx_t *rctx)
595 {
596     VALUE  io_input, io_error;
597 
598     io_input = nxt_ruby_stream_io_input_init();
599 
600     rctx->io_input = rb_funcall(io_input, rb_intern("new"), 1,
601                                    (VALUE) (uintptr_t) rctx);
602     if (nxt_slow_path(rctx->io_input == Qnil)) {
603         nxt_unit_alert(NULL,
604                        "Ruby: Failed to create environment 'rack.input' var");
605 
606         return NXT_UNIT_ERROR;
607     }
608 
609     rb_gc_register_address(&rctx->io_input);
610 
611     io_error = nxt_ruby_stream_io_error_init();
612 
613     rctx->io_error = rb_funcall(io_error, rb_intern("new"), 1,
614                                    (VALUE) (uintptr_t) rctx);
615     if (nxt_slow_path(rctx->io_error == Qnil)) {
616         nxt_unit_alert(NULL,
617                        "Ruby: Failed to create environment 'rack.error' var");
618 
619         return NXT_UNIT_ERROR;
620     }
621 
622     rb_gc_register_address(&rctx->io_error);
623 
624     return NXT_UNIT_OK;
625 }
626 
627 
628 static void
nxt_ruby_request_handler(nxt_unit_request_info_t * req)629 nxt_ruby_request_handler(nxt_unit_request_info_t *req)
630 {
631     (void) rb_thread_call_with_gvl(nxt_ruby_request_handler_gvl, req);
632 }
633 
634 
635 static void *
nxt_ruby_request_handler_gvl(void * data)636 nxt_ruby_request_handler_gvl(void *data)
637 {
638     int                      state;
639     VALUE                    res;
640     nxt_ruby_ctx_t           *rctx;
641     nxt_unit_request_info_t  *req;
642 
643     req = data;
644 
645     rctx = req->ctx->data;
646     rctx->req = req;
647 
648     res = rb_protect(nxt_ruby_rack_app_run, (VALUE) (uintptr_t) req, &state);
649     if (nxt_slow_path(res == Qnil || state != 0)) {
650         nxt_ruby_exception_log(req, NXT_LOG_ERR,
651                                "Failed to run ruby script");
652 
653         nxt_unit_request_done(req, NXT_UNIT_ERROR);
654 
655     } else {
656         nxt_unit_request_done(req, NXT_UNIT_OK);
657     }
658 
659     rctx->req = NULL;
660 
661     return NULL;
662 }
663 
664 
665 static VALUE
nxt_ruby_rack_app_run(VALUE arg)666 nxt_ruby_rack_app_run(VALUE arg)
667 {
668     int                      rc;
669     VALUE                    env, result;
670     nxt_int_t                status;
671     nxt_ruby_ctx_t           *rctx;
672     nxt_unit_request_info_t  *req;
673 
674     req = (nxt_unit_request_info_t *) arg;
675 
676     rctx = req->ctx->data;
677 
678     env = rb_hash_dup(rctx->env);
679 
680     rc = nxt_ruby_read_request(req, env);
681     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
682         nxt_unit_req_alert(req,
683                            "Ruby: Failed to process incoming request");
684 
685         goto fail;
686     }
687 
688     result = rb_funcall(nxt_ruby_rackup, nxt_ruby_call, 1, env);
689     if (nxt_slow_path(TYPE(result) != T_ARRAY)) {
690         nxt_unit_req_error(req,
691                            "Ruby: Invalid response format from application");
692 
693         goto fail;
694     }
695 
696     if (nxt_slow_path(RARRAY_LEN(result) != 3)) {
697         nxt_unit_req_error(req,
698                            "Ruby: Invalid response format from application. "
699                            "Need 3 entries [Status, Headers, Body]");
700 
701         goto fail;
702     }
703 
704     status = nxt_ruby_rack_result_status(req, result);
705     if (nxt_slow_path(status < 0)) {
706         nxt_unit_req_error(req,
707                            "Ruby: Invalid response status from application.");
708 
709         goto fail;
710     }
711 
712     rc = nxt_ruby_rack_result_headers(req, result, status);
713     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
714         goto fail;
715     }
716 
717     rc = nxt_ruby_rack_result_body(req, result);
718     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
719         goto fail;
720     }
721 
722     rb_hash_delete(env, rb_obj_id(env));
723 
724     return result;
725 
726 fail:
727 
728     rb_hash_delete(env, rb_obj_id(env));
729 
730     return Qnil;
731 }
732 
733 
734 static int
nxt_ruby_read_request(nxt_unit_request_info_t * req,VALUE hash_env)735 nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env)
736 {
737     VALUE               name;
738     uint32_t            i;
739     nxt_unit_field_t    *f;
740     nxt_unit_request_t  *r;
741 
742     r = req->request;
743 
744     nxt_ruby_add_sptr(hash_env, nxt_rb_request_method_str, &r->method,
745                       r->method_length);
746     nxt_ruby_add_sptr(hash_env, nxt_rb_request_uri_str, &r->target,
747                       r->target_length);
748     nxt_ruby_add_sptr(hash_env, nxt_rb_path_info_str, &r->path, r->path_length);
749     nxt_ruby_add_sptr(hash_env, nxt_rb_query_string_str, &r->query,
750                       r->query_length);
751     nxt_ruby_add_sptr(hash_env, nxt_rb_server_protocol_str, &r->version,
752                       r->version_length);
753     nxt_ruby_add_sptr(hash_env, nxt_rb_remote_addr_str, &r->remote,
754                       r->remote_length);
755     nxt_ruby_add_sptr(hash_env, nxt_rb_server_addr_str, &r->local_addr,
756                       r->local_addr_length);
757     nxt_ruby_add_sptr(hash_env, nxt_rb_server_name_str, &r->server_name,
758                       r->server_name_length);
759 
760     rb_hash_aset(hash_env, nxt_rb_server_port_str, nxt_rb_80_str);
761 
762     rb_hash_aset(hash_env, nxt_rb_rack_url_scheme_str,
763                  r->tls ? nxt_rb_https_str : nxt_rb_http_str);
764 
765     for (i = 0; i < r->fields_count; i++) {
766         f = r->fields + i;
767 
768         name = rb_str_new(nxt_unit_sptr_get(&f->name), f->name_length);
769 
770         nxt_ruby_add_sptr(hash_env, name, &f->value, f->value_length);
771     }
772 
773     if (r->content_length_field != NXT_UNIT_NONE_FIELD) {
774         f = r->fields + r->content_length_field;
775 
776         nxt_ruby_add_sptr(hash_env, nxt_rb_content_length_str,
777                           &f->value, f->value_length);
778     }
779 
780     if (r->content_type_field != NXT_UNIT_NONE_FIELD) {
781         f = r->fields + r->content_type_field;
782 
783         nxt_ruby_add_sptr(hash_env, nxt_rb_content_type_str,
784                           &f->value, f->value_length);
785     }
786 
787     return NXT_UNIT_OK;
788 }
789 
790 
791 nxt_inline void
nxt_ruby_add_sptr(VALUE hash_env,VALUE name,nxt_unit_sptr_t * sptr,uint32_t len)792 nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
793     nxt_unit_sptr_t *sptr, uint32_t len)
794 {
795     char  *str;
796 
797     str = nxt_unit_sptr_get(sptr);
798 
799     rb_hash_aset(hash_env, name, rb_str_new(str, len));
800 }
801 
802 
803 static nxt_int_t
nxt_ruby_rack_result_status(nxt_unit_request_info_t * req,VALUE result)804 nxt_ruby_rack_result_status(nxt_unit_request_info_t *req, VALUE result)
805 {
806     VALUE   status;
807 
808     status = rb_ary_entry(result, 0);
809 
810     if (TYPE(status) == T_FIXNUM) {
811         return FIX2INT(status);
812     }
813 
814     if (TYPE(status) == T_STRING) {
815         return nxt_int_parse((u_char *) RSTRING_PTR(status),
816                              RSTRING_LEN(status));
817     }
818 
819     nxt_unit_req_error(req, "Ruby: Invalid response 'status' "
820                        "format from application");
821 
822     return -2;
823 }
824 
825 
826 typedef struct {
827     int                      rc;
828     uint32_t                 fields;
829     uint32_t                 size;
830     nxt_unit_request_info_t  *req;
831 } nxt_ruby_headers_info_t;
832 
833 
834 static int
nxt_ruby_rack_result_headers(nxt_unit_request_info_t * req,VALUE result,nxt_int_t status)835 nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req, VALUE result,
836     nxt_int_t status)
837 {
838     int                      rc;
839     VALUE                    headers;
840     nxt_ruby_headers_info_t  headers_info;
841 
842     headers = rb_ary_entry(result, 1);
843     if (nxt_slow_path(TYPE(headers) != T_HASH)) {
844         nxt_unit_req_error(req,
845                            "Ruby: Invalid response 'headers' format from "
846                            "application");
847 
848         return NXT_UNIT_ERROR;
849     }
850 
851     rc = NXT_UNIT_OK;
852 
853     headers_info.rc = NXT_UNIT_OK;
854     headers_info.fields = 0;
855     headers_info.size = 0;
856     headers_info.req = req;
857 
858     rb_hash_foreach(headers, nxt_ruby_hash_info,
859                     (VALUE) (uintptr_t) &headers_info);
860     if (nxt_slow_path(headers_info.rc != NXT_UNIT_OK)) {
861         return headers_info.rc;
862     }
863 
864     rc = nxt_unit_response_init(req, status,
865                                 headers_info.fields, headers_info.size);
866     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
867         return rc;
868     }
869 
870     rb_hash_foreach(headers, nxt_ruby_hash_add,
871                     (VALUE) (uintptr_t) &headers_info);
872 
873     return rc;
874 }
875 
876 
877 static int
nxt_ruby_hash_info(VALUE r_key,VALUE r_value,VALUE arg)878 nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg)
879 {
880     const char               *value, *value_end, *pos;
881     nxt_ruby_headers_info_t  *headers_info;
882 
883     headers_info = (void *) (uintptr_t) arg;
884 
885     if (nxt_slow_path(TYPE(r_key) != T_STRING)) {
886         nxt_unit_req_error(headers_info->req,
887                            "Ruby: Wrong header entry 'key' from application");
888 
889         goto fail;
890     }
891 
892     if (nxt_slow_path(TYPE(r_value) != T_STRING && TYPE(r_value) != T_ARRAY)) {
893         nxt_unit_req_error(headers_info->req,
894                            "Ruby: Wrong header entry 'value' from application");
895 
896         goto fail;
897     }
898 
899     if (TYPE(r_value) == T_ARRAY) {
900         int     i;
901         int     arr_len = RARRAY_LEN(r_value);
902         VALUE   item;
903         size_t  len = 0;
904 
905         for (i = 0; i < arr_len; i++) {
906             item = rb_ary_entry(r_value, i);
907             if (TYPE(item) != T_STRING) {
908                 nxt_unit_req_error(headers_info->req,
909                                    "Ruby: Wrong header entry in 'value' array "
910                                    "from application");
911                 goto fail;
912             }
913 
914             len += RSTRING_LEN(item) + 2;   /* +2 for '; ' */
915         }
916 
917         if (arr_len > 0) {
918             len -= 2;
919         }
920 
921         headers_info->fields++;
922         headers_info->size += RSTRING_LEN(r_key) + len;
923 
924         return ST_CONTINUE;
925     }
926 
927     value = RSTRING_PTR(r_value);
928     value_end = value + RSTRING_LEN(r_value);
929 
930     pos = value;
931 
932     for ( ;; ) {
933         pos = strchr(pos, '\n');
934 
935         if (pos == NULL) {
936             break;
937         }
938 
939         headers_info->fields++;
940         headers_info->size += RSTRING_LEN(r_key) + (pos - value);
941 
942         pos++;
943         value = pos;
944     }
945 
946     if (value <= value_end) {
947         headers_info->fields++;
948         headers_info->size += RSTRING_LEN(r_key) + (value_end - value);
949     }
950 
951     return ST_CONTINUE;
952 
953 fail:
954 
955     headers_info->rc = NXT_UNIT_ERROR;
956 
957     return ST_STOP;
958 }
959 
960 
961 static int
nxt_ruby_hash_add(VALUE r_key,VALUE r_value,VALUE arg)962 nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg)
963 {
964     int                      *rc;
965     uint32_t                 key_len;
966     const char               *value, *value_end, *pos;
967     nxt_ruby_headers_info_t  *headers_info;
968 
969     headers_info = (void *) (uintptr_t) arg;
970     rc = &headers_info->rc;
971 
972     key_len = RSTRING_LEN(r_key);
973 
974     if (TYPE(r_value) == T_ARRAY) {
975         int     i;
976         int     arr_len = RARRAY_LEN(r_value);
977         char    *field, *p;
978         VALUE   item;
979         size_t  len = 0;
980 
981         for (i = 0; i < arr_len; i++) {
982             item = rb_ary_entry(r_value, i);
983 
984             len += RSTRING_LEN(item) + 2;   /* +2 for '; ' */
985         }
986 
987         field = nxt_unit_malloc(NULL, len);
988         if (field == NULL) {
989             goto fail;
990         }
991 
992         p = field;
993 
994         for (i = 0; i < arr_len; i++) {
995             item = rb_ary_entry(r_value, i);
996 
997             p = nxt_cpymem(p, RSTRING_PTR(item), RSTRING_LEN(item));
998             p = nxt_cpymem(p, "; ", 2);
999         }
1000 
1001         if (arr_len > 0) {
1002             len -= 2;
1003         }
1004 
1005         *rc = nxt_unit_response_add_field(headers_info->req,
1006                                           RSTRING_PTR(r_key), key_len,
1007                                           field, len);
1008         nxt_unit_free(NULL, field);
1009 
1010         if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
1011             goto fail;
1012         }
1013 
1014         return ST_CONTINUE;
1015     }
1016 
1017     value = RSTRING_PTR(r_value);
1018     value_end = value + RSTRING_LEN(r_value);
1019 
1020     pos = value;
1021 
1022     for ( ;; ) {
1023         pos = strchr(pos, '\n');
1024 
1025         if (pos == NULL) {
1026             break;
1027         }
1028 
1029         *rc = nxt_unit_response_add_field(headers_info->req,
1030                                           RSTRING_PTR(r_key), key_len,
1031                                           value, pos - value);
1032         if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
1033             goto fail;
1034         }
1035 
1036         pos++;
1037         value = pos;
1038     }
1039 
1040     if (value <= value_end) {
1041         *rc = nxt_unit_response_add_field(headers_info->req,
1042                                           RSTRING_PTR(r_key), key_len,
1043                                           value, value_end - value);
1044         if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
1045             goto fail;
1046         }
1047     }
1048 
1049     return ST_CONTINUE;
1050 
1051 fail:
1052 
1053     *rc = NXT_UNIT_ERROR;
1054 
1055     return ST_STOP;
1056 }
1057 
1058 
1059 static int
nxt_ruby_rack_result_body(nxt_unit_request_info_t * req,VALUE result)1060 nxt_ruby_rack_result_body(nxt_unit_request_info_t *req, VALUE result)
1061 {
1062     int    rc;
1063     VALUE  fn, body;
1064 
1065     body = rb_ary_entry(result, 2);
1066 
1067     if (rb_respond_to(body, rb_intern("to_path"))) {
1068 
1069         fn = rb_funcall(body, rb_intern("to_path"), 0);
1070         if (nxt_slow_path(TYPE(fn) != T_STRING)) {
1071             nxt_unit_req_error(req,
1072                                "Ruby: Failed to get 'body' file path from "
1073                                "application");
1074 
1075             return NXT_UNIT_ERROR;
1076         }
1077 
1078         rc = nxt_ruby_rack_result_body_file_write(req, fn);
1079         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1080             return rc;
1081         }
1082 
1083     } else if (rb_respond_to(body, rb_intern("each"))) {
1084         rb_block_call(body, rb_intern("each"), 0, 0,
1085                       nxt_ruby_rack_result_body_each, (VALUE) (uintptr_t) req);
1086 
1087     } else {
1088         nxt_unit_req_error(req,
1089                            "Ruby: Invalid response 'body' format "
1090                            "from application");
1091 
1092         return NXT_UNIT_ERROR;
1093     }
1094 
1095     if (rb_respond_to(body, rb_intern("close"))) {
1096         rb_funcall(body, rb_intern("close"), 0);
1097     }
1098 
1099     return NXT_UNIT_OK;
1100 }
1101 
1102 
1103 typedef struct {
1104     int    fd;
1105     off_t  pos;
1106     off_t  rest;
1107 } nxt_ruby_rack_file_t;
1108 
1109 
1110 static ssize_t
nxt_ruby_rack_file_read(nxt_unit_read_info_t * read_info,void * dst,size_t size)1111 nxt_ruby_rack_file_read(nxt_unit_read_info_t *read_info, void *dst, size_t size)
1112 {
1113     ssize_t               res;
1114     nxt_ruby_rack_file_t  *file;
1115 
1116     file = read_info->data;
1117 
1118     size = nxt_min(size, (size_t) file->rest);
1119 
1120     res = pread(file->fd, dst, size, file->pos);
1121 
1122     if (res >= 0) {
1123         file->pos += res;
1124         file->rest -= res;
1125 
1126         if (size > (size_t) res) {
1127             file->rest = 0;
1128         }
1129     }
1130 
1131     read_info->eof = file->rest == 0;
1132 
1133     return res;
1134 }
1135 
1136 
1137 typedef struct {
1138     nxt_unit_read_info_t     read_info;
1139     nxt_unit_request_info_t  *req;
1140 } nxt_ruby_read_info_t;
1141 
1142 
1143 static int
nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t * req,VALUE filepath)1144 nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
1145     VALUE filepath)
1146 {
1147     int                   fd, rc;
1148     struct stat           finfo;
1149     nxt_ruby_rack_file_t  ruby_file;
1150     nxt_ruby_read_info_t  ri;
1151 
1152     fd = open(RSTRING_PTR(filepath), O_RDONLY, 0);
1153     if (nxt_slow_path(fd == -1)) {
1154         nxt_unit_req_error(req,
1155                            "Ruby: Failed to open content file \"%s\": %s (%d)",
1156                            RSTRING_PTR(filepath), strerror(errno), errno);
1157 
1158         return NXT_UNIT_ERROR;
1159     }
1160 
1161     rc = fstat(fd, &finfo);
1162     if (nxt_slow_path(rc == -1)) {
1163         nxt_unit_req_error(req,
1164                            "Ruby: Content file fstat(\"%s\") failed: %s (%d)",
1165                            RSTRING_PTR(filepath), strerror(errno), errno);
1166 
1167         close(fd);
1168 
1169         return NXT_UNIT_ERROR;
1170     }
1171 
1172     ruby_file.fd = fd;
1173     ruby_file.pos = 0;
1174     ruby_file.rest = finfo.st_size;
1175 
1176     ri.read_info.read = nxt_ruby_rack_file_read;
1177     ri.read_info.eof = ruby_file.rest == 0;
1178     ri.read_info.buf_size = ruby_file.rest;
1179     ri.read_info.data = &ruby_file;
1180     ri.req = req;
1181 
1182     rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_response_write_cb,
1183                                                &ri,
1184                                                nxt_ruby_ubf,
1185                                                req->ctx);
1186 
1187     close(fd);
1188 
1189     return rc;
1190 }
1191 
1192 
1193 static void *
nxt_ruby_response_write_cb(void * data)1194 nxt_ruby_response_write_cb(void *data)
1195 {
1196     int                   rc;
1197     nxt_ruby_read_info_t  *ri;
1198 
1199     ri = data;
1200 
1201     rc = nxt_unit_response_write_cb(ri->req, &ri->read_info);
1202     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1203         nxt_unit_req_error(ri->req, "Ruby: Failed to write content file.");
1204     }
1205 
1206     return (void *) (intptr_t) rc;
1207 }
1208 
1209 
1210 typedef struct {
1211     VALUE                    body;
1212     nxt_unit_request_info_t  *req;
1213 } nxt_ruby_write_info_t;
1214 
1215 
1216 static VALUE
nxt_ruby_rack_result_body_each(VALUE body,VALUE arg,int argc,const VALUE * argv,VALUE blockarg)1217 nxt_ruby_rack_result_body_each(VALUE body, VALUE arg, int argc,
1218     const VALUE *argv, VALUE blockarg)
1219 {
1220     nxt_ruby_write_info_t  wi;
1221 
1222     if (TYPE(body) != T_STRING) {
1223         return Qnil;
1224     }
1225 
1226     wi.body = body;
1227     wi.req = (void *) (uintptr_t) arg;
1228 
1229     (void) rb_thread_call_without_gvl(nxt_ruby_response_write,
1230                                       (void *) (uintptr_t) &wi,
1231                                       nxt_ruby_ubf, wi.req->ctx);
1232 
1233     return Qnil;
1234 }
1235 
1236 
1237 static void *
nxt_ruby_response_write(void * data)1238 nxt_ruby_response_write(void *data)
1239 {
1240     int                    rc;
1241     nxt_ruby_write_info_t  *wi;
1242 
1243     wi = data;
1244 
1245     rc = nxt_unit_response_write(wi->req, RSTRING_PTR(wi->body),
1246                                  RSTRING_LEN(wi->body));
1247     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1248         nxt_unit_req_error(wi->req,
1249                            "Ruby: Failed to write 'body' from application");
1250     }
1251 
1252     return (void *) (intptr_t) rc;
1253 }
1254 
1255 
1256 static void
nxt_ruby_exception_log(nxt_unit_request_info_t * req,uint32_t level,const char * desc)1257 nxt_ruby_exception_log(nxt_unit_request_info_t *req, uint32_t level,
1258     const char *desc)
1259 {
1260     int    i;
1261     VALUE  err, ary, eclass, msg;
1262 
1263     nxt_unit_req_log(req, level, "Ruby: %s", desc);
1264 
1265     err = rb_errinfo();
1266     if (nxt_slow_path(err == Qnil)) {
1267         return;
1268     }
1269 
1270     eclass = rb_class_name(rb_class_of(err));
1271 
1272     msg = rb_funcall(err, rb_intern("message"), 0);
1273     ary = rb_funcall(err, rb_intern("backtrace"), 0);
1274 
1275     if (RARRAY_LEN(ary) == 0) {
1276         nxt_unit_req_log(req, level, "Ruby: %s (%s)", RSTRING_PTR(msg),
1277                          RSTRING_PTR(eclass));
1278 
1279         return;
1280     }
1281 
1282     nxt_unit_req_log(req, level, "Ruby: %s: %s (%s)",
1283                      RSTRING_PTR(RARRAY_PTR(ary)[0]),
1284                      RSTRING_PTR(msg), RSTRING_PTR(eclass));
1285 
1286     for (i = 1; i < RARRAY_LEN(ary); i++) {
1287         nxt_unit_req_log(req, level, "from %s",
1288                          RSTRING_PTR(RARRAY_PTR(ary)[i]));
1289     }
1290 }
1291 
1292 
1293 static void
nxt_ruby_ctx_done(nxt_ruby_ctx_t * rctx)1294 nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx)
1295 {
1296     if (rctx->io_input != Qnil) {
1297         rb_gc_unregister_address(&rctx->io_input);
1298     }
1299 
1300     if (rctx->io_error != Qnil) {
1301         rb_gc_unregister_address(&rctx->io_error);
1302     }
1303 
1304     if (rctx->env != Qnil) {
1305         rb_gc_unregister_address(&rctx->env);
1306     }
1307 }
1308 
1309 
1310 static void
nxt_ruby_atexit(void)1311 nxt_ruby_atexit(void)
1312 {
1313     if (nxt_ruby_rackup != Qnil) {
1314         rb_gc_unregister_address(&nxt_ruby_rackup);
1315     }
1316 
1317     if (nxt_ruby_call != Qnil) {
1318         rb_gc_unregister_address(&nxt_ruby_call);
1319     }
1320 
1321     if (nxt_ruby_hook_procs != Qnil) {
1322         rb_gc_unregister_address(&nxt_ruby_hook_procs);
1323     }
1324 
1325     nxt_ruby_done_strings();
1326 
1327     ruby_cleanup(0);
1328 }
1329 
1330 
1331 static int
nxt_ruby_ready_handler(nxt_unit_ctx_t * ctx)1332 nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx)
1333 {
1334     VALUE                res;
1335     uint32_t             i;
1336     nxt_ruby_ctx_t       *rctx;
1337     nxt_ruby_app_conf_t  *c;
1338 
1339     c = ctx->unit->data;
1340 
1341     if (c->threads <= 1) {
1342         return NXT_UNIT_OK;
1343     }
1344 
1345     for (i = 0; i < c->threads - 1; i++) {
1346         rctx = &nxt_ruby_ctxs[i];
1347 
1348         rctx->ctx = ctx;
1349 
1350         res = (VALUE) rb_thread_call_with_gvl(nxt_ruby_thread_create_gvl, rctx);
1351 
1352         if (nxt_fast_path(res != Qnil)) {
1353             nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1));
1354 
1355             rctx->thread = res;
1356 
1357         } else {
1358             nxt_unit_alert(ctx, "thread #%d create failed", (int) (i + 1));
1359 
1360             return NXT_UNIT_ERROR;
1361         }
1362     }
1363 
1364     return NXT_UNIT_OK;
1365 }
1366 
1367 
1368 static void *
nxt_ruby_thread_create_gvl(void * rctx)1369 nxt_ruby_thread_create_gvl(void *rctx)
1370 {
1371     VALUE  res;
1372 
1373     res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx);
1374 
1375     return (void *) (uintptr_t) res;
1376 }
1377 
1378 
1379 static VALUE
nxt_ruby_thread_func(VALUE arg)1380 nxt_ruby_thread_func(VALUE arg)
1381 {
1382     int             state;
1383     nxt_unit_ctx_t  *ctx;
1384     nxt_ruby_ctx_t  *rctx;
1385 
1386     rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
1387 
1388     nxt_unit_debug(rctx->ctx, "worker thread start");
1389 
1390     ctx = nxt_unit_ctx_alloc(rctx->ctx, rctx);
1391     if (nxt_slow_path(ctx == NULL)) {
1392         goto fail;
1393     }
1394 
1395     if (nxt_ruby_hook_procs != Qnil) {
1396         rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_boot, &state);
1397         if (nxt_slow_path(state != 0)) {
1398             nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
1399                                    "Failed to call on_thread_boot()");
1400         }
1401     }
1402 
1403     (void) rb_thread_call_without_gvl(nxt_ruby_unit_run, ctx,
1404                                       nxt_ruby_ubf, ctx);
1405 
1406     if (nxt_ruby_hook_procs != Qnil) {
1407         rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_shutdown, &state);
1408         if (nxt_slow_path(state != 0)) {
1409             nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
1410                                    "Failed to call on_thread_shutdown()");
1411         }
1412     }
1413 
1414     nxt_unit_done(ctx);
1415 
1416 fail:
1417 
1418     nxt_unit_debug(NULL, "worker thread end");
1419 
1420     return Qnil;
1421 }
1422 
1423 
1424 static void *
nxt_ruby_unit_run(void * ctx)1425 nxt_ruby_unit_run(void *ctx)
1426 {
1427     return (void *) (intptr_t) nxt_unit_run(ctx);
1428 }
1429 
1430 
1431 static void
nxt_ruby_ubf(void * ctx)1432 nxt_ruby_ubf(void *ctx)
1433 {
1434     nxt_unit_warn(ctx, "Ruby: UBF");
1435 }
1436 
1437 
1438 static int
nxt_ruby_init_threads(nxt_ruby_app_conf_t * c)1439 nxt_ruby_init_threads(nxt_ruby_app_conf_t *c)
1440 {
1441     int             state;
1442     uint32_t        i;
1443     nxt_ruby_ctx_t  *rctx;
1444 
1445     if (c->threads <= 1) {
1446         return NXT_UNIT_OK;
1447     }
1448 
1449     nxt_ruby_ctxs = nxt_unit_malloc(NULL, sizeof(nxt_ruby_ctx_t)
1450                                           * (c->threads - 1));
1451     if (nxt_slow_path(nxt_ruby_ctxs == NULL)) {
1452         nxt_unit_alert(NULL, "Failed to allocate run contexts array");
1453 
1454         return NXT_UNIT_ERROR;
1455     }
1456 
1457     for (i = 0; i < c->threads - 1; i++) {
1458         rctx = &nxt_ruby_ctxs[i];
1459 
1460         rctx->env = Qnil;
1461         rctx->io_input = Qnil;
1462         rctx->io_error = Qnil;
1463         rctx->thread = Qnil;
1464     }
1465 
1466     for (i = 0; i < c->threads - 1; i++) {
1467         rctx = &nxt_ruby_ctxs[i];
1468 
1469         rctx->env = rb_protect(nxt_ruby_rack_env_create,
1470                                (VALUE) (uintptr_t) rctx, &state);
1471         if (nxt_slow_path(rctx->env == Qnil || state != 0)) {
1472             nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
1473                                    "Failed to create 'environ' variable");
1474             return NXT_UNIT_ERROR;
1475         }
1476     }
1477 
1478     return NXT_UNIT_OK;
1479 }
1480 
1481 
1482 static void
nxt_ruby_join_threads(nxt_unit_ctx_t * ctx,nxt_ruby_app_conf_t * c)1483 nxt_ruby_join_threads(nxt_unit_ctx_t *ctx, nxt_ruby_app_conf_t *c)
1484 {
1485     uint32_t        i;
1486     nxt_ruby_ctx_t  *rctx;
1487 
1488     if (nxt_ruby_ctxs == NULL) {
1489         return;
1490     }
1491 
1492     for (i = 0; i < c->threads - 1; i++) {
1493         rctx = &nxt_ruby_ctxs[i];
1494 
1495         if (rctx->thread != Qnil) {
1496             rb_funcall(rctx->thread, rb_intern("join"), 0);
1497 
1498             nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1));
1499 
1500         } else {
1501             nxt_unit_debug(ctx, "thread #%d not started", (int) (i + 1));
1502         }
1503     }
1504 
1505     for (i = 0; i < c->threads - 1; i++) {
1506         nxt_ruby_ctx_done(&nxt_ruby_ctxs[i]);
1507     }
1508 
1509     nxt_unit_free(ctx, nxt_ruby_ctxs);
1510 }
1511