1 /*
2 * Copyright (C) Alexander Borisov
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include <ruby/nxt_ruby.h>
7
8 #include <nxt_unit.h>
9 #include <nxt_unit_request.h>
10
11 #include <ruby/thread.h>
12
13 #include NXT_RUBY_MOUNTS_H
14
15 #include <locale.h>
16
17
18 #define NXT_RUBY_RACK_API_VERSION_MAJOR 1
19 #define NXT_RUBY_RACK_API_VERSION_MINOR 3
20
21
22 typedef struct {
23 nxt_task_t *task;
24 nxt_str_t *script;
25 nxt_ruby_ctx_t *rctx;
26 } nxt_ruby_rack_init_t;
27
28
29 static nxt_int_t nxt_ruby_start(nxt_task_t *task,
30 nxt_process_data_t *data);
31 static VALUE nxt_ruby_init_basic(VALUE arg);
32
33 static VALUE nxt_ruby_hook_procs_load(VALUE path);
34 static VALUE nxt_ruby_hook_register(VALUE arg);
35 static VALUE nxt_ruby_hook_call(VALUE name);
36
37 static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init);
38
39 static VALUE nxt_ruby_require_rubygems(VALUE arg);
40 static VALUE nxt_ruby_bundler_setup(VALUE arg);
41 static VALUE nxt_ruby_require_rack(VALUE arg);
42 static VALUE nxt_ruby_rack_parse_script(VALUE ctx);
43 static VALUE nxt_ruby_rack_env_create(VALUE arg);
44 static int nxt_ruby_init_io(nxt_ruby_ctx_t *rctx);
45 static void nxt_ruby_request_handler(nxt_unit_request_info_t *req);
46 static void *nxt_ruby_request_handler_gvl(void *req);
47 static int nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx);
48 static void *nxt_ruby_thread_create_gvl(void *rctx);
49 static VALUE nxt_ruby_thread_func(VALUE arg);
50 static void *nxt_ruby_unit_run(void *ctx);
51 static void nxt_ruby_ubf(void *ctx);
52 static int nxt_ruby_init_threads(nxt_ruby_app_conf_t *c);
53 static void nxt_ruby_join_threads(nxt_unit_ctx_t *ctx,
54 nxt_ruby_app_conf_t *c);
55
56 static VALUE nxt_ruby_rack_app_run(VALUE arg);
57 static int nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env);
58 nxt_inline void nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
59 nxt_unit_sptr_t *sptr, uint32_t len);
60 static nxt_int_t nxt_ruby_rack_result_status(nxt_unit_request_info_t *req,
61 VALUE result);
62 static int nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req,
63 VALUE result, nxt_int_t status);
64 static int nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg);
65 static int nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg);
66 static int nxt_ruby_rack_result_body(nxt_unit_request_info_t *req,
67 VALUE result);
68 static int nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
69 VALUE filepath);
70 static void *nxt_ruby_response_write_cb(void *read_info);
71 static VALUE nxt_ruby_rack_result_body_each(VALUE body, VALUE arg,
72 int argc, const VALUE *argv, VALUE blockarg);
73 static void *nxt_ruby_response_write(void *body);
74
75 static void nxt_ruby_exception_log(nxt_unit_request_info_t *req,
76 uint32_t level, const char *desc);
77
78 static void nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx);
79 static void nxt_ruby_atexit(void);
80
81
82 static uint32_t compat[] = {
83 NXT_VERNUM, NXT_DEBUG,
84 };
85
86 static VALUE nxt_ruby_hook_procs;
87 static VALUE nxt_ruby_rackup;
88 static VALUE nxt_ruby_call;
89
90 static uint32_t nxt_ruby_threads;
91 static nxt_ruby_ctx_t *nxt_ruby_ctxs;
92
93 NXT_EXPORT nxt_app_module_t nxt_app_module = {
94 sizeof(compat),
95 compat,
96 nxt_string("ruby"),
97 ruby_version,
98 nxt_ruby_mounts,
99 nxt_nitems(nxt_ruby_mounts),
100 NULL,
101 nxt_ruby_start,
102 };
103
104 typedef struct {
105 nxt_str_t string;
106 VALUE *v;
107 } nxt_ruby_string_t;
108
109 static VALUE nxt_rb_80_str;
110 static VALUE nxt_rb_content_length_str;
111 static VALUE nxt_rb_content_type_str;
112 static VALUE nxt_rb_http_str;
113 static VALUE nxt_rb_https_str;
114 static VALUE nxt_rb_path_info_str;
115 static VALUE nxt_rb_query_string_str;
116 static VALUE nxt_rb_rack_url_scheme_str;
117 static VALUE nxt_rb_remote_addr_str;
118 static VALUE nxt_rb_request_method_str;
119 static VALUE nxt_rb_request_uri_str;
120 static VALUE nxt_rb_server_addr_str;
121 static VALUE nxt_rb_server_name_str;
122 static VALUE nxt_rb_server_port_str;
123 static VALUE nxt_rb_server_protocol_str;
124 static VALUE nxt_rb_on_worker_boot;
125 static VALUE nxt_rb_on_worker_shutdown;
126 static VALUE nxt_rb_on_thread_boot;
127 static VALUE nxt_rb_on_thread_shutdown;
128
129 static nxt_ruby_string_t nxt_rb_strings[] = {
130 { nxt_string("80"), &nxt_rb_80_str },
131 { nxt_string("CONTENT_LENGTH"), &nxt_rb_content_length_str },
132 { nxt_string("CONTENT_TYPE"), &nxt_rb_content_type_str },
133 { nxt_string("http"), &nxt_rb_http_str },
134 { nxt_string("https"), &nxt_rb_https_str },
135 { nxt_string("PATH_INFO"), &nxt_rb_path_info_str },
136 { nxt_string("QUERY_STRING"), &nxt_rb_query_string_str },
137 { nxt_string("rack.url_scheme"), &nxt_rb_rack_url_scheme_str },
138 { nxt_string("REMOTE_ADDR"), &nxt_rb_remote_addr_str },
139 { nxt_string("REQUEST_METHOD"), &nxt_rb_request_method_str },
140 { nxt_string("REQUEST_URI"), &nxt_rb_request_uri_str },
141 { nxt_string("SERVER_ADDR"), &nxt_rb_server_addr_str },
142 { nxt_string("SERVER_NAME"), &nxt_rb_server_name_str },
143 { nxt_string("SERVER_PORT"), &nxt_rb_server_port_str },
144 { nxt_string("SERVER_PROTOCOL"), &nxt_rb_server_protocol_str },
145 { nxt_string("on_worker_boot"), &nxt_rb_on_worker_boot },
146 { nxt_string("on_worker_shutdown"), &nxt_rb_on_worker_shutdown },
147 { nxt_string("on_thread_boot"), &nxt_rb_on_thread_boot },
148 { nxt_string("on_thread_shutdown"), &nxt_rb_on_thread_shutdown },
149 { nxt_null_string, NULL },
150 };
151
152
153 static int
nxt_ruby_init_strings(void)154 nxt_ruby_init_strings(void)
155 {
156 VALUE v;
157 nxt_ruby_string_t *pstr;
158
159 pstr = nxt_rb_strings;
160
161 while (pstr->string.start != NULL) {
162 v = rb_str_new_static((char *) pstr->string.start, pstr->string.length);
163
164 if (nxt_slow_path(v == Qnil)) {
165 nxt_unit_alert(NULL, "Ruby: failed to create const string '%.*s'",
166 (int) pstr->string.length,
167 (char *) pstr->string.start);
168
169 return NXT_UNIT_ERROR;
170 }
171
172 *pstr->v = v;
173
174 rb_gc_register_address(pstr->v);
175
176 pstr++;
177 }
178
179 return NXT_UNIT_OK;
180 }
181
182
183 static void
nxt_ruby_done_strings(void)184 nxt_ruby_done_strings(void)
185 {
186 nxt_ruby_string_t *pstr;
187
188 pstr = nxt_rb_strings;
189
190 while (pstr->string.start != NULL) {
191 rb_gc_unregister_address(pstr->v);
192
193 *pstr->v = Qnil;
194
195 pstr++;
196 }
197 }
198
199
200 static VALUE
nxt_ruby_hook_procs_load(VALUE path)201 nxt_ruby_hook_procs_load(VALUE path)
202 {
203 VALUE module, file, file_obj;
204
205 module = rb_define_module("Unit");
206
207 nxt_ruby_hook_procs = rb_hash_new();
208
209 rb_gc_register_address(&nxt_ruby_hook_procs);
210
211 rb_define_module_function(module, "on_worker_boot",
212 &nxt_ruby_hook_register, 0);
213 rb_define_module_function(module, "on_worker_shutdown",
214 &nxt_ruby_hook_register, 0);
215 rb_define_module_function(module, "on_thread_boot",
216 &nxt_ruby_hook_register, 0);
217 rb_define_module_function(module, "on_thread_shutdown",
218 &nxt_ruby_hook_register, 0);
219
220 file = rb_const_get(rb_cObject, rb_intern("File"));
221 file_obj = rb_funcall(file, rb_intern("read"), 1, path);
222
223 return rb_funcall(module, rb_intern("module_eval"), 3, file_obj, path,
224 INT2NUM(1));
225 }
226
227
228 static VALUE
nxt_ruby_hook_register(VALUE arg)229 nxt_ruby_hook_register(VALUE arg)
230 {
231 VALUE kernel, callee, callee_str;
232
233 rb_need_block();
234
235 kernel = rb_const_get(rb_cObject, rb_intern("Kernel"));
236 callee = rb_funcall(kernel, rb_intern("__callee__"), 0);
237 callee_str = rb_funcall(callee, rb_intern("to_s"), 0);
238
239 rb_hash_aset(nxt_ruby_hook_procs, callee_str, rb_block_proc());
240
241 return Qnil;
242 }
243
244
245 static VALUE
nxt_ruby_hook_call(VALUE name)246 nxt_ruby_hook_call(VALUE name)
247 {
248 VALUE proc;
249
250 proc = rb_hash_lookup(nxt_ruby_hook_procs, name);
251 if (proc == Qnil) {
252 return Qnil;
253 }
254
255 return rb_funcall(proc, rb_intern("call"), 0);
256 }
257
258
259 static nxt_int_t
nxt_ruby_start(nxt_task_t * task,nxt_process_data_t * data)260 nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data)
261 {
262 int state, rc;
263 VALUE res, path;
264 nxt_ruby_ctx_t ruby_ctx;
265 nxt_unit_ctx_t *unit_ctx;
266 nxt_unit_init_t ruby_unit_init;
267 nxt_ruby_app_conf_t *c;
268 nxt_ruby_rack_init_t rack_init;
269 nxt_common_app_conf_t *conf;
270
271 static char *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" };
272
273 signal(SIGINT, SIG_IGN);
274
275 conf = data->app;
276 c = &conf->u.ruby;
277
278 nxt_ruby_threads = c->threads;
279
280 setlocale(LC_CTYPE, "");
281
282 RUBY_INIT_STACK
283 ruby_init();
284 ruby_options(2, argv);
285 ruby_script("NGINX_Unit");
286
287 ruby_ctx.env = Qnil;
288 ruby_ctx.io_input = Qnil;
289 ruby_ctx.io_error = Qnil;
290 ruby_ctx.thread = Qnil;
291 ruby_ctx.ctx = NULL;
292 ruby_ctx.req = NULL;
293
294 rack_init.task = task;
295 rack_init.script = &c->script;
296 rack_init.rctx = &ruby_ctx;
297
298 nxt_ruby_init_strings();
299
300 res = rb_protect(nxt_ruby_init_basic,
301 (VALUE) (uintptr_t) &rack_init, &state);
302 if (nxt_slow_path(res == Qnil || state != 0)) {
303 nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
304 "Failed to init basic variables");
305 return NXT_ERROR;
306 }
307
308 nxt_ruby_call = Qnil;
309 nxt_ruby_hook_procs = Qnil;
310
311 if (c->hooks.start != NULL) {
312 path = rb_str_new((const char *) c->hooks.start,
313 (long) c->hooks.length);
314
315 rb_protect(nxt_ruby_hook_procs_load, path, &state);
316 rb_str_free(path);
317 if (nxt_slow_path(state != 0)) {
318 nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
319 "Failed to setup hooks");
320 return NXT_ERROR;
321 }
322 }
323
324 if (nxt_ruby_hook_procs != Qnil) {
325 rb_protect(nxt_ruby_hook_call, nxt_rb_on_worker_boot, &state);
326 if (nxt_slow_path(state != 0)) {
327 nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
328 "Failed to call on_worker_boot()");
329 return NXT_ERROR;
330 }
331 }
332
333 nxt_ruby_rackup = nxt_ruby_rack_init(&rack_init);
334 if (nxt_slow_path(nxt_ruby_rackup == Qnil)) {
335 return NXT_ERROR;
336 }
337
338 rb_gc_register_address(&nxt_ruby_rackup);
339
340 nxt_ruby_call = rb_intern("call");
341 if (nxt_slow_path(nxt_ruby_call == Qnil)) {
342 nxt_alert(task, "Ruby: Unable to find rack entry point");
343
344 goto fail;
345 }
346
347 rb_gc_register_address(&nxt_ruby_call);
348
349 ruby_ctx.env = rb_protect(nxt_ruby_rack_env_create,
350 (VALUE) (uintptr_t) &ruby_ctx, &state);
351 if (nxt_slow_path(ruby_ctx.env == Qnil || state != 0)) {
352 nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
353 "Failed to create 'environ' variable");
354 goto fail;
355 }
356
357 rc = nxt_ruby_init_threads(c);
358 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
359 goto fail;
360 }
361
362 nxt_unit_default_init(task, &ruby_unit_init, conf);
363
364 ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler;
365 ruby_unit_init.callbacks.ready_handler = nxt_ruby_ready_handler;
366 ruby_unit_init.data = c;
367 ruby_unit_init.ctx_data = &ruby_ctx;
368
369 unit_ctx = nxt_unit_init(&ruby_unit_init);
370 if (nxt_slow_path(unit_ctx == NULL)) {
371 goto fail;
372 }
373
374 if (nxt_ruby_hook_procs != Qnil) {
375 rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_boot, &state);
376 if (nxt_slow_path(state != 0)) {
377 nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
378 "Failed to call on_thread_boot()");
379 }
380 }
381
382 rc = (intptr_t) rb_thread_call_without_gvl2(nxt_ruby_unit_run, unit_ctx,
383 nxt_ruby_ubf, unit_ctx);
384
385 if (nxt_ruby_hook_procs != Qnil) {
386 rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_shutdown, &state);
387 if (nxt_slow_path(state != 0)) {
388 nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
389 "Failed to call on_thread_shutdown()");
390 }
391 }
392
393 nxt_ruby_join_threads(unit_ctx, c);
394
395 if (nxt_ruby_hook_procs != Qnil) {
396 rb_protect(nxt_ruby_hook_call, nxt_rb_on_worker_shutdown, &state);
397 if (nxt_slow_path(state != 0)) {
398 nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
399 "Failed to call on_worker_shutdown()");
400 }
401 }
402
403 nxt_unit_done(unit_ctx);
404
405 nxt_ruby_ctx_done(&ruby_ctx);
406
407 nxt_ruby_atexit();
408
409 exit(rc);
410
411 return NXT_OK;
412
413 fail:
414
415 nxt_ruby_join_threads(NULL, c);
416
417 nxt_ruby_ctx_done(&ruby_ctx);
418
419 nxt_ruby_atexit();
420
421 return NXT_ERROR;
422 }
423
424
425 static VALUE
nxt_ruby_init_basic(VALUE arg)426 nxt_ruby_init_basic(VALUE arg)
427 {
428 int state;
429 nxt_ruby_rack_init_t *rack_init;
430
431 rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) arg;
432
433 state = rb_enc_find_index("encdb");
434 if (nxt_slow_path(state == 0)) {
435 nxt_alert(rack_init->task,
436 "Ruby: Failed to find encoding index 'encdb'");
437
438 return Qnil;
439 }
440
441 rb_funcall(rb_cObject, rb_intern("require"), 1,
442 rb_str_new2("enc/trans/transdb"));
443
444 return arg;
445 }
446
447
448 static VALUE
nxt_ruby_rack_init(nxt_ruby_rack_init_t * rack_init)449 nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init)
450 {
451 int state;
452 VALUE rackup, err;
453
454 rb_protect(nxt_ruby_require_rubygems, Qnil, &state);
455 if (nxt_slow_path(state != 0)) {
456 nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
457 "Failed to require 'rubygems' package");
458 return Qnil;
459 }
460
461 rb_protect(nxt_ruby_bundler_setup, Qnil, &state);
462 if (state != 0) {
463 err = rb_errinfo();
464
465 if (rb_obj_is_kind_of(err, rb_eLoadError) == Qfalse) {
466 nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
467 "Failed to require 'bundler/setup' package");
468 return Qnil;
469 }
470
471 rb_set_errinfo(Qnil);
472 }
473
474 rb_protect(nxt_ruby_require_rack, Qnil, &state);
475 if (nxt_slow_path(state != 0)) {
476 nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
477 "Failed to require 'rack' package");
478 return Qnil;
479 }
480
481 rackup = rb_protect(nxt_ruby_rack_parse_script,
482 (VALUE) (uintptr_t) rack_init, &state);
483
484 if (nxt_slow_path(state != 0)) {
485 nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
486 "Failed to parse rack script");
487 return Qnil;
488 }
489
490 if (TYPE(rackup) != T_ARRAY) {
491 return rackup;
492 }
493
494 if (nxt_slow_path(RARRAY_LEN(rackup) < 1)) {
495 nxt_ruby_exception_log(NULL, NXT_LOG_ALERT, "Invalid rack config file");
496 return Qnil;
497 }
498
499 return RARRAY_PTR(rackup)[0];
500 }
501
502
503 static VALUE
nxt_ruby_require_rubygems(VALUE arg)504 nxt_ruby_require_rubygems(VALUE arg)
505 {
506 return rb_funcall(rb_cObject, rb_intern("require"), 1,
507 rb_str_new2("rubygems"));
508 }
509
510
511 static VALUE
nxt_ruby_bundler_setup(VALUE arg)512 nxt_ruby_bundler_setup(VALUE arg)
513 {
514 return rb_funcall(rb_cObject, rb_intern("require"), 1,
515 rb_str_new2("bundler/setup"));
516 }
517
518
519 static VALUE
nxt_ruby_require_rack(VALUE arg)520 nxt_ruby_require_rack(VALUE arg)
521 {
522 return rb_funcall(rb_cObject, rb_intern("require"), 1, rb_str_new2("rack"));
523 }
524
525
526 static VALUE
nxt_ruby_rack_parse_script(VALUE ctx)527 nxt_ruby_rack_parse_script(VALUE ctx)
528 {
529 VALUE script, res, rack, builder;
530 nxt_ruby_rack_init_t *rack_init;
531
532 rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) ctx;
533
534 rack = rb_const_get(rb_cObject, rb_intern("Rack"));
535 builder = rb_const_get(rack, rb_intern("Builder"));
536
537 script = rb_str_new((const char *) rack_init->script->start,
538 (long) rack_init->script->length);
539
540 res = rb_funcall(builder, rb_intern("parse_file"), 1, script);
541
542 rb_str_free(script);
543
544 return res;
545 }
546
547
548 static VALUE
nxt_ruby_rack_env_create(VALUE arg)549 nxt_ruby_rack_env_create(VALUE arg)
550 {
551 int rc;
552 VALUE hash_env, version;
553 nxt_ruby_ctx_t *rctx;
554
555 rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
556
557 rc = nxt_ruby_init_io(rctx);
558 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
559 return Qnil;
560 }
561
562 hash_env = rb_hash_new();
563
564 rb_hash_aset(hash_env, rb_str_new2("SERVER_SOFTWARE"),
565 rb_str_new((const char *) nxt_server.start,
566 (long) nxt_server.length));
567
568 version = rb_ary_new();
569
570 rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MAJOR));
571 rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MINOR));
572
573 rb_hash_aset(hash_env, rb_str_new2("SCRIPT_NAME"), rb_str_new("", 0));
574 rb_hash_aset(hash_env, rb_str_new2("rack.version"), version);
575 rb_hash_aset(hash_env, rb_str_new2("rack.input"), rctx->io_input);
576 rb_hash_aset(hash_env, rb_str_new2("rack.errors"), rctx->io_error);
577 rb_hash_aset(hash_env, rb_str_new2("rack.multithread"),
578 nxt_ruby_threads > 1 ? Qtrue : Qfalse);
579 rb_hash_aset(hash_env, rb_str_new2("rack.multiprocess"), Qtrue);
580 rb_hash_aset(hash_env, rb_str_new2("rack.run_once"), Qfalse);
581 rb_hash_aset(hash_env, rb_str_new2("rack.hijack?"), Qfalse);
582 rb_hash_aset(hash_env, rb_str_new2("rack.hijack"), Qnil);
583 rb_hash_aset(hash_env, rb_str_new2("rack.hijack_io"), Qnil);
584
585 rctx->env = hash_env;
586
587 rb_gc_register_address(&rctx->env);
588
589 return hash_env;
590 }
591
592
593 static int
nxt_ruby_init_io(nxt_ruby_ctx_t * rctx)594 nxt_ruby_init_io(nxt_ruby_ctx_t *rctx)
595 {
596 VALUE io_input, io_error;
597
598 io_input = nxt_ruby_stream_io_input_init();
599
600 rctx->io_input = rb_funcall(io_input, rb_intern("new"), 1,
601 (VALUE) (uintptr_t) rctx);
602 if (nxt_slow_path(rctx->io_input == Qnil)) {
603 nxt_unit_alert(NULL,
604 "Ruby: Failed to create environment 'rack.input' var");
605
606 return NXT_UNIT_ERROR;
607 }
608
609 rb_gc_register_address(&rctx->io_input);
610
611 io_error = nxt_ruby_stream_io_error_init();
612
613 rctx->io_error = rb_funcall(io_error, rb_intern("new"), 1,
614 (VALUE) (uintptr_t) rctx);
615 if (nxt_slow_path(rctx->io_error == Qnil)) {
616 nxt_unit_alert(NULL,
617 "Ruby: Failed to create environment 'rack.error' var");
618
619 return NXT_UNIT_ERROR;
620 }
621
622 rb_gc_register_address(&rctx->io_error);
623
624 return NXT_UNIT_OK;
625 }
626
627
628 static void
nxt_ruby_request_handler(nxt_unit_request_info_t * req)629 nxt_ruby_request_handler(nxt_unit_request_info_t *req)
630 {
631 (void) rb_thread_call_with_gvl(nxt_ruby_request_handler_gvl, req);
632 }
633
634
635 static void *
nxt_ruby_request_handler_gvl(void * data)636 nxt_ruby_request_handler_gvl(void *data)
637 {
638 int state;
639 VALUE res;
640 nxt_ruby_ctx_t *rctx;
641 nxt_unit_request_info_t *req;
642
643 req = data;
644
645 rctx = req->ctx->data;
646 rctx->req = req;
647
648 res = rb_protect(nxt_ruby_rack_app_run, (VALUE) (uintptr_t) req, &state);
649 if (nxt_slow_path(res == Qnil || state != 0)) {
650 nxt_ruby_exception_log(req, NXT_LOG_ERR,
651 "Failed to run ruby script");
652
653 nxt_unit_request_done(req, NXT_UNIT_ERROR);
654
655 } else {
656 nxt_unit_request_done(req, NXT_UNIT_OK);
657 }
658
659 rctx->req = NULL;
660
661 return NULL;
662 }
663
664
665 static VALUE
nxt_ruby_rack_app_run(VALUE arg)666 nxt_ruby_rack_app_run(VALUE arg)
667 {
668 int rc;
669 VALUE env, result;
670 nxt_int_t status;
671 nxt_ruby_ctx_t *rctx;
672 nxt_unit_request_info_t *req;
673
674 req = (nxt_unit_request_info_t *) arg;
675
676 rctx = req->ctx->data;
677
678 env = rb_hash_dup(rctx->env);
679
680 rc = nxt_ruby_read_request(req, env);
681 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
682 nxt_unit_req_alert(req,
683 "Ruby: Failed to process incoming request");
684
685 goto fail;
686 }
687
688 result = rb_funcall(nxt_ruby_rackup, nxt_ruby_call, 1, env);
689 if (nxt_slow_path(TYPE(result) != T_ARRAY)) {
690 nxt_unit_req_error(req,
691 "Ruby: Invalid response format from application");
692
693 goto fail;
694 }
695
696 if (nxt_slow_path(RARRAY_LEN(result) != 3)) {
697 nxt_unit_req_error(req,
698 "Ruby: Invalid response format from application. "
699 "Need 3 entries [Status, Headers, Body]");
700
701 goto fail;
702 }
703
704 status = nxt_ruby_rack_result_status(req, result);
705 if (nxt_slow_path(status < 0)) {
706 nxt_unit_req_error(req,
707 "Ruby: Invalid response status from application.");
708
709 goto fail;
710 }
711
712 rc = nxt_ruby_rack_result_headers(req, result, status);
713 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
714 goto fail;
715 }
716
717 rc = nxt_ruby_rack_result_body(req, result);
718 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
719 goto fail;
720 }
721
722 rb_hash_delete(env, rb_obj_id(env));
723
724 return result;
725
726 fail:
727
728 rb_hash_delete(env, rb_obj_id(env));
729
730 return Qnil;
731 }
732
733
734 static int
nxt_ruby_read_request(nxt_unit_request_info_t * req,VALUE hash_env)735 nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env)
736 {
737 VALUE name;
738 uint32_t i;
739 nxt_unit_field_t *f;
740 nxt_unit_request_t *r;
741
742 r = req->request;
743
744 nxt_ruby_add_sptr(hash_env, nxt_rb_request_method_str, &r->method,
745 r->method_length);
746 nxt_ruby_add_sptr(hash_env, nxt_rb_request_uri_str, &r->target,
747 r->target_length);
748 nxt_ruby_add_sptr(hash_env, nxt_rb_path_info_str, &r->path, r->path_length);
749 nxt_ruby_add_sptr(hash_env, nxt_rb_query_string_str, &r->query,
750 r->query_length);
751 nxt_ruby_add_sptr(hash_env, nxt_rb_server_protocol_str, &r->version,
752 r->version_length);
753 nxt_ruby_add_sptr(hash_env, nxt_rb_remote_addr_str, &r->remote,
754 r->remote_length);
755 nxt_ruby_add_sptr(hash_env, nxt_rb_server_addr_str, &r->local_addr,
756 r->local_addr_length);
757 nxt_ruby_add_sptr(hash_env, nxt_rb_server_name_str, &r->server_name,
758 r->server_name_length);
759
760 rb_hash_aset(hash_env, nxt_rb_server_port_str, nxt_rb_80_str);
761
762 rb_hash_aset(hash_env, nxt_rb_rack_url_scheme_str,
763 r->tls ? nxt_rb_https_str : nxt_rb_http_str);
764
765 for (i = 0; i < r->fields_count; i++) {
766 f = r->fields + i;
767
768 name = rb_str_new(nxt_unit_sptr_get(&f->name), f->name_length);
769
770 nxt_ruby_add_sptr(hash_env, name, &f->value, f->value_length);
771 }
772
773 if (r->content_length_field != NXT_UNIT_NONE_FIELD) {
774 f = r->fields + r->content_length_field;
775
776 nxt_ruby_add_sptr(hash_env, nxt_rb_content_length_str,
777 &f->value, f->value_length);
778 }
779
780 if (r->content_type_field != NXT_UNIT_NONE_FIELD) {
781 f = r->fields + r->content_type_field;
782
783 nxt_ruby_add_sptr(hash_env, nxt_rb_content_type_str,
784 &f->value, f->value_length);
785 }
786
787 return NXT_UNIT_OK;
788 }
789
790
791 nxt_inline void
nxt_ruby_add_sptr(VALUE hash_env,VALUE name,nxt_unit_sptr_t * sptr,uint32_t len)792 nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
793 nxt_unit_sptr_t *sptr, uint32_t len)
794 {
795 char *str;
796
797 str = nxt_unit_sptr_get(sptr);
798
799 rb_hash_aset(hash_env, name, rb_str_new(str, len));
800 }
801
802
803 static nxt_int_t
nxt_ruby_rack_result_status(nxt_unit_request_info_t * req,VALUE result)804 nxt_ruby_rack_result_status(nxt_unit_request_info_t *req, VALUE result)
805 {
806 VALUE status;
807
808 status = rb_ary_entry(result, 0);
809
810 if (TYPE(status) == T_FIXNUM) {
811 return FIX2INT(status);
812 }
813
814 if (TYPE(status) == T_STRING) {
815 return nxt_int_parse((u_char *) RSTRING_PTR(status),
816 RSTRING_LEN(status));
817 }
818
819 nxt_unit_req_error(req, "Ruby: Invalid response 'status' "
820 "format from application");
821
822 return -2;
823 }
824
825
826 typedef struct {
827 int rc;
828 uint32_t fields;
829 uint32_t size;
830 nxt_unit_request_info_t *req;
831 } nxt_ruby_headers_info_t;
832
833
834 static int
nxt_ruby_rack_result_headers(nxt_unit_request_info_t * req,VALUE result,nxt_int_t status)835 nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req, VALUE result,
836 nxt_int_t status)
837 {
838 int rc;
839 VALUE headers;
840 nxt_ruby_headers_info_t headers_info;
841
842 headers = rb_ary_entry(result, 1);
843 if (nxt_slow_path(TYPE(headers) != T_HASH)) {
844 nxt_unit_req_error(req,
845 "Ruby: Invalid response 'headers' format from "
846 "application");
847
848 return NXT_UNIT_ERROR;
849 }
850
851 rc = NXT_UNIT_OK;
852
853 headers_info.rc = NXT_UNIT_OK;
854 headers_info.fields = 0;
855 headers_info.size = 0;
856 headers_info.req = req;
857
858 rb_hash_foreach(headers, nxt_ruby_hash_info,
859 (VALUE) (uintptr_t) &headers_info);
860 if (nxt_slow_path(headers_info.rc != NXT_UNIT_OK)) {
861 return headers_info.rc;
862 }
863
864 rc = nxt_unit_response_init(req, status,
865 headers_info.fields, headers_info.size);
866 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
867 return rc;
868 }
869
870 rb_hash_foreach(headers, nxt_ruby_hash_add,
871 (VALUE) (uintptr_t) &headers_info);
872
873 return rc;
874 }
875
876
877 static int
nxt_ruby_hash_info(VALUE r_key,VALUE r_value,VALUE arg)878 nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg)
879 {
880 const char *value, *value_end, *pos;
881 nxt_ruby_headers_info_t *headers_info;
882
883 headers_info = (void *) (uintptr_t) arg;
884
885 if (nxt_slow_path(TYPE(r_key) != T_STRING)) {
886 nxt_unit_req_error(headers_info->req,
887 "Ruby: Wrong header entry 'key' from application");
888
889 goto fail;
890 }
891
892 if (nxt_slow_path(TYPE(r_value) != T_STRING && TYPE(r_value) != T_ARRAY)) {
893 nxt_unit_req_error(headers_info->req,
894 "Ruby: Wrong header entry 'value' from application");
895
896 goto fail;
897 }
898
899 if (TYPE(r_value) == T_ARRAY) {
900 int i;
901 int arr_len = RARRAY_LEN(r_value);
902 VALUE item;
903 size_t len = 0;
904
905 for (i = 0; i < arr_len; i++) {
906 item = rb_ary_entry(r_value, i);
907 if (TYPE(item) != T_STRING) {
908 nxt_unit_req_error(headers_info->req,
909 "Ruby: Wrong header entry in 'value' array "
910 "from application");
911 goto fail;
912 }
913
914 len += RSTRING_LEN(item) + 2; /* +2 for '; ' */
915 }
916
917 if (arr_len > 0) {
918 len -= 2;
919 }
920
921 headers_info->fields++;
922 headers_info->size += RSTRING_LEN(r_key) + len;
923
924 return ST_CONTINUE;
925 }
926
927 value = RSTRING_PTR(r_value);
928 value_end = value + RSTRING_LEN(r_value);
929
930 pos = value;
931
932 for ( ;; ) {
933 pos = strchr(pos, '\n');
934
935 if (pos == NULL) {
936 break;
937 }
938
939 headers_info->fields++;
940 headers_info->size += RSTRING_LEN(r_key) + (pos - value);
941
942 pos++;
943 value = pos;
944 }
945
946 if (value <= value_end) {
947 headers_info->fields++;
948 headers_info->size += RSTRING_LEN(r_key) + (value_end - value);
949 }
950
951 return ST_CONTINUE;
952
953 fail:
954
955 headers_info->rc = NXT_UNIT_ERROR;
956
957 return ST_STOP;
958 }
959
960
961 static int
nxt_ruby_hash_add(VALUE r_key,VALUE r_value,VALUE arg)962 nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg)
963 {
964 int *rc;
965 uint32_t key_len;
966 const char *value, *value_end, *pos;
967 nxt_ruby_headers_info_t *headers_info;
968
969 headers_info = (void *) (uintptr_t) arg;
970 rc = &headers_info->rc;
971
972 key_len = RSTRING_LEN(r_key);
973
974 if (TYPE(r_value) == T_ARRAY) {
975 int i;
976 int arr_len = RARRAY_LEN(r_value);
977 char *field, *p;
978 VALUE item;
979 size_t len = 0;
980
981 for (i = 0; i < arr_len; i++) {
982 item = rb_ary_entry(r_value, i);
983
984 len += RSTRING_LEN(item) + 2; /* +2 for '; ' */
985 }
986
987 field = nxt_unit_malloc(NULL, len);
988 if (field == NULL) {
989 goto fail;
990 }
991
992 p = field;
993
994 for (i = 0; i < arr_len; i++) {
995 item = rb_ary_entry(r_value, i);
996
997 p = nxt_cpymem(p, RSTRING_PTR(item), RSTRING_LEN(item));
998 p = nxt_cpymem(p, "; ", 2);
999 }
1000
1001 if (arr_len > 0) {
1002 len -= 2;
1003 }
1004
1005 *rc = nxt_unit_response_add_field(headers_info->req,
1006 RSTRING_PTR(r_key), key_len,
1007 field, len);
1008 nxt_unit_free(NULL, field);
1009
1010 if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
1011 goto fail;
1012 }
1013
1014 return ST_CONTINUE;
1015 }
1016
1017 value = RSTRING_PTR(r_value);
1018 value_end = value + RSTRING_LEN(r_value);
1019
1020 pos = value;
1021
1022 for ( ;; ) {
1023 pos = strchr(pos, '\n');
1024
1025 if (pos == NULL) {
1026 break;
1027 }
1028
1029 *rc = nxt_unit_response_add_field(headers_info->req,
1030 RSTRING_PTR(r_key), key_len,
1031 value, pos - value);
1032 if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
1033 goto fail;
1034 }
1035
1036 pos++;
1037 value = pos;
1038 }
1039
1040 if (value <= value_end) {
1041 *rc = nxt_unit_response_add_field(headers_info->req,
1042 RSTRING_PTR(r_key), key_len,
1043 value, value_end - value);
1044 if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
1045 goto fail;
1046 }
1047 }
1048
1049 return ST_CONTINUE;
1050
1051 fail:
1052
1053 *rc = NXT_UNIT_ERROR;
1054
1055 return ST_STOP;
1056 }
1057
1058
1059 static int
nxt_ruby_rack_result_body(nxt_unit_request_info_t * req,VALUE result)1060 nxt_ruby_rack_result_body(nxt_unit_request_info_t *req, VALUE result)
1061 {
1062 int rc;
1063 VALUE fn, body;
1064
1065 body = rb_ary_entry(result, 2);
1066
1067 if (rb_respond_to(body, rb_intern("to_path"))) {
1068
1069 fn = rb_funcall(body, rb_intern("to_path"), 0);
1070 if (nxt_slow_path(TYPE(fn) != T_STRING)) {
1071 nxt_unit_req_error(req,
1072 "Ruby: Failed to get 'body' file path from "
1073 "application");
1074
1075 return NXT_UNIT_ERROR;
1076 }
1077
1078 rc = nxt_ruby_rack_result_body_file_write(req, fn);
1079 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1080 return rc;
1081 }
1082
1083 } else if (rb_respond_to(body, rb_intern("each"))) {
1084 rb_block_call(body, rb_intern("each"), 0, 0,
1085 nxt_ruby_rack_result_body_each, (VALUE) (uintptr_t) req);
1086
1087 } else {
1088 nxt_unit_req_error(req,
1089 "Ruby: Invalid response 'body' format "
1090 "from application");
1091
1092 return NXT_UNIT_ERROR;
1093 }
1094
1095 if (rb_respond_to(body, rb_intern("close"))) {
1096 rb_funcall(body, rb_intern("close"), 0);
1097 }
1098
1099 return NXT_UNIT_OK;
1100 }
1101
1102
1103 typedef struct {
1104 int fd;
1105 off_t pos;
1106 off_t rest;
1107 } nxt_ruby_rack_file_t;
1108
1109
1110 static ssize_t
nxt_ruby_rack_file_read(nxt_unit_read_info_t * read_info,void * dst,size_t size)1111 nxt_ruby_rack_file_read(nxt_unit_read_info_t *read_info, void *dst, size_t size)
1112 {
1113 ssize_t res;
1114 nxt_ruby_rack_file_t *file;
1115
1116 file = read_info->data;
1117
1118 size = nxt_min(size, (size_t) file->rest);
1119
1120 res = pread(file->fd, dst, size, file->pos);
1121
1122 if (res >= 0) {
1123 file->pos += res;
1124 file->rest -= res;
1125
1126 if (size > (size_t) res) {
1127 file->rest = 0;
1128 }
1129 }
1130
1131 read_info->eof = file->rest == 0;
1132
1133 return res;
1134 }
1135
1136
1137 typedef struct {
1138 nxt_unit_read_info_t read_info;
1139 nxt_unit_request_info_t *req;
1140 } nxt_ruby_read_info_t;
1141
1142
1143 static int
nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t * req,VALUE filepath)1144 nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
1145 VALUE filepath)
1146 {
1147 int fd, rc;
1148 struct stat finfo;
1149 nxt_ruby_rack_file_t ruby_file;
1150 nxt_ruby_read_info_t ri;
1151
1152 fd = open(RSTRING_PTR(filepath), O_RDONLY, 0);
1153 if (nxt_slow_path(fd == -1)) {
1154 nxt_unit_req_error(req,
1155 "Ruby: Failed to open content file \"%s\": %s (%d)",
1156 RSTRING_PTR(filepath), strerror(errno), errno);
1157
1158 return NXT_UNIT_ERROR;
1159 }
1160
1161 rc = fstat(fd, &finfo);
1162 if (nxt_slow_path(rc == -1)) {
1163 nxt_unit_req_error(req,
1164 "Ruby: Content file fstat(\"%s\") failed: %s (%d)",
1165 RSTRING_PTR(filepath), strerror(errno), errno);
1166
1167 close(fd);
1168
1169 return NXT_UNIT_ERROR;
1170 }
1171
1172 ruby_file.fd = fd;
1173 ruby_file.pos = 0;
1174 ruby_file.rest = finfo.st_size;
1175
1176 ri.read_info.read = nxt_ruby_rack_file_read;
1177 ri.read_info.eof = ruby_file.rest == 0;
1178 ri.read_info.buf_size = ruby_file.rest;
1179 ri.read_info.data = &ruby_file;
1180 ri.req = req;
1181
1182 rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_response_write_cb,
1183 &ri,
1184 nxt_ruby_ubf,
1185 req->ctx);
1186
1187 close(fd);
1188
1189 return rc;
1190 }
1191
1192
1193 static void *
nxt_ruby_response_write_cb(void * data)1194 nxt_ruby_response_write_cb(void *data)
1195 {
1196 int rc;
1197 nxt_ruby_read_info_t *ri;
1198
1199 ri = data;
1200
1201 rc = nxt_unit_response_write_cb(ri->req, &ri->read_info);
1202 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1203 nxt_unit_req_error(ri->req, "Ruby: Failed to write content file.");
1204 }
1205
1206 return (void *) (intptr_t) rc;
1207 }
1208
1209
1210 typedef struct {
1211 VALUE body;
1212 nxt_unit_request_info_t *req;
1213 } nxt_ruby_write_info_t;
1214
1215
1216 static VALUE
nxt_ruby_rack_result_body_each(VALUE body,VALUE arg,int argc,const VALUE * argv,VALUE blockarg)1217 nxt_ruby_rack_result_body_each(VALUE body, VALUE arg, int argc,
1218 const VALUE *argv, VALUE blockarg)
1219 {
1220 nxt_ruby_write_info_t wi;
1221
1222 if (TYPE(body) != T_STRING) {
1223 return Qnil;
1224 }
1225
1226 wi.body = body;
1227 wi.req = (void *) (uintptr_t) arg;
1228
1229 (void) rb_thread_call_without_gvl(nxt_ruby_response_write,
1230 (void *) (uintptr_t) &wi,
1231 nxt_ruby_ubf, wi.req->ctx);
1232
1233 return Qnil;
1234 }
1235
1236
1237 static void *
nxt_ruby_response_write(void * data)1238 nxt_ruby_response_write(void *data)
1239 {
1240 int rc;
1241 nxt_ruby_write_info_t *wi;
1242
1243 wi = data;
1244
1245 rc = nxt_unit_response_write(wi->req, RSTRING_PTR(wi->body),
1246 RSTRING_LEN(wi->body));
1247 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1248 nxt_unit_req_error(wi->req,
1249 "Ruby: Failed to write 'body' from application");
1250 }
1251
1252 return (void *) (intptr_t) rc;
1253 }
1254
1255
1256 static void
nxt_ruby_exception_log(nxt_unit_request_info_t * req,uint32_t level,const char * desc)1257 nxt_ruby_exception_log(nxt_unit_request_info_t *req, uint32_t level,
1258 const char *desc)
1259 {
1260 int i;
1261 VALUE err, ary, eclass, msg;
1262
1263 nxt_unit_req_log(req, level, "Ruby: %s", desc);
1264
1265 err = rb_errinfo();
1266 if (nxt_slow_path(err == Qnil)) {
1267 return;
1268 }
1269
1270 eclass = rb_class_name(rb_class_of(err));
1271
1272 msg = rb_funcall(err, rb_intern("message"), 0);
1273 ary = rb_funcall(err, rb_intern("backtrace"), 0);
1274
1275 if (RARRAY_LEN(ary) == 0) {
1276 nxt_unit_req_log(req, level, "Ruby: %s (%s)", RSTRING_PTR(msg),
1277 RSTRING_PTR(eclass));
1278
1279 return;
1280 }
1281
1282 nxt_unit_req_log(req, level, "Ruby: %s: %s (%s)",
1283 RSTRING_PTR(RARRAY_PTR(ary)[0]),
1284 RSTRING_PTR(msg), RSTRING_PTR(eclass));
1285
1286 for (i = 1; i < RARRAY_LEN(ary); i++) {
1287 nxt_unit_req_log(req, level, "from %s",
1288 RSTRING_PTR(RARRAY_PTR(ary)[i]));
1289 }
1290 }
1291
1292
1293 static void
nxt_ruby_ctx_done(nxt_ruby_ctx_t * rctx)1294 nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx)
1295 {
1296 if (rctx->io_input != Qnil) {
1297 rb_gc_unregister_address(&rctx->io_input);
1298 }
1299
1300 if (rctx->io_error != Qnil) {
1301 rb_gc_unregister_address(&rctx->io_error);
1302 }
1303
1304 if (rctx->env != Qnil) {
1305 rb_gc_unregister_address(&rctx->env);
1306 }
1307 }
1308
1309
1310 static void
nxt_ruby_atexit(void)1311 nxt_ruby_atexit(void)
1312 {
1313 if (nxt_ruby_rackup != Qnil) {
1314 rb_gc_unregister_address(&nxt_ruby_rackup);
1315 }
1316
1317 if (nxt_ruby_call != Qnil) {
1318 rb_gc_unregister_address(&nxt_ruby_call);
1319 }
1320
1321 if (nxt_ruby_hook_procs != Qnil) {
1322 rb_gc_unregister_address(&nxt_ruby_hook_procs);
1323 }
1324
1325 nxt_ruby_done_strings();
1326
1327 ruby_cleanup(0);
1328 }
1329
1330
1331 static int
nxt_ruby_ready_handler(nxt_unit_ctx_t * ctx)1332 nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx)
1333 {
1334 VALUE res;
1335 uint32_t i;
1336 nxt_ruby_ctx_t *rctx;
1337 nxt_ruby_app_conf_t *c;
1338
1339 c = ctx->unit->data;
1340
1341 if (c->threads <= 1) {
1342 return NXT_UNIT_OK;
1343 }
1344
1345 for (i = 0; i < c->threads - 1; i++) {
1346 rctx = &nxt_ruby_ctxs[i];
1347
1348 rctx->ctx = ctx;
1349
1350 res = (VALUE) rb_thread_call_with_gvl(nxt_ruby_thread_create_gvl, rctx);
1351
1352 if (nxt_fast_path(res != Qnil)) {
1353 nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1));
1354
1355 rctx->thread = res;
1356
1357 } else {
1358 nxt_unit_alert(ctx, "thread #%d create failed", (int) (i + 1));
1359
1360 return NXT_UNIT_ERROR;
1361 }
1362 }
1363
1364 return NXT_UNIT_OK;
1365 }
1366
1367
1368 static void *
nxt_ruby_thread_create_gvl(void * rctx)1369 nxt_ruby_thread_create_gvl(void *rctx)
1370 {
1371 VALUE res;
1372
1373 res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx);
1374
1375 return (void *) (uintptr_t) res;
1376 }
1377
1378
1379 static VALUE
nxt_ruby_thread_func(VALUE arg)1380 nxt_ruby_thread_func(VALUE arg)
1381 {
1382 int state;
1383 nxt_unit_ctx_t *ctx;
1384 nxt_ruby_ctx_t *rctx;
1385
1386 rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
1387
1388 nxt_unit_debug(rctx->ctx, "worker thread start");
1389
1390 ctx = nxt_unit_ctx_alloc(rctx->ctx, rctx);
1391 if (nxt_slow_path(ctx == NULL)) {
1392 goto fail;
1393 }
1394
1395 if (nxt_ruby_hook_procs != Qnil) {
1396 rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_boot, &state);
1397 if (nxt_slow_path(state != 0)) {
1398 nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
1399 "Failed to call on_thread_boot()");
1400 }
1401 }
1402
1403 (void) rb_thread_call_without_gvl(nxt_ruby_unit_run, ctx,
1404 nxt_ruby_ubf, ctx);
1405
1406 if (nxt_ruby_hook_procs != Qnil) {
1407 rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_shutdown, &state);
1408 if (nxt_slow_path(state != 0)) {
1409 nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
1410 "Failed to call on_thread_shutdown()");
1411 }
1412 }
1413
1414 nxt_unit_done(ctx);
1415
1416 fail:
1417
1418 nxt_unit_debug(NULL, "worker thread end");
1419
1420 return Qnil;
1421 }
1422
1423
1424 static void *
nxt_ruby_unit_run(void * ctx)1425 nxt_ruby_unit_run(void *ctx)
1426 {
1427 return (void *) (intptr_t) nxt_unit_run(ctx);
1428 }
1429
1430
1431 static void
nxt_ruby_ubf(void * ctx)1432 nxt_ruby_ubf(void *ctx)
1433 {
1434 nxt_unit_warn(ctx, "Ruby: UBF");
1435 }
1436
1437
1438 static int
nxt_ruby_init_threads(nxt_ruby_app_conf_t * c)1439 nxt_ruby_init_threads(nxt_ruby_app_conf_t *c)
1440 {
1441 int state;
1442 uint32_t i;
1443 nxt_ruby_ctx_t *rctx;
1444
1445 if (c->threads <= 1) {
1446 return NXT_UNIT_OK;
1447 }
1448
1449 nxt_ruby_ctxs = nxt_unit_malloc(NULL, sizeof(nxt_ruby_ctx_t)
1450 * (c->threads - 1));
1451 if (nxt_slow_path(nxt_ruby_ctxs == NULL)) {
1452 nxt_unit_alert(NULL, "Failed to allocate run contexts array");
1453
1454 return NXT_UNIT_ERROR;
1455 }
1456
1457 for (i = 0; i < c->threads - 1; i++) {
1458 rctx = &nxt_ruby_ctxs[i];
1459
1460 rctx->env = Qnil;
1461 rctx->io_input = Qnil;
1462 rctx->io_error = Qnil;
1463 rctx->thread = Qnil;
1464 }
1465
1466 for (i = 0; i < c->threads - 1; i++) {
1467 rctx = &nxt_ruby_ctxs[i];
1468
1469 rctx->env = rb_protect(nxt_ruby_rack_env_create,
1470 (VALUE) (uintptr_t) rctx, &state);
1471 if (nxt_slow_path(rctx->env == Qnil || state != 0)) {
1472 nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
1473 "Failed to create 'environ' variable");
1474 return NXT_UNIT_ERROR;
1475 }
1476 }
1477
1478 return NXT_UNIT_OK;
1479 }
1480
1481
1482 static void
nxt_ruby_join_threads(nxt_unit_ctx_t * ctx,nxt_ruby_app_conf_t * c)1483 nxt_ruby_join_threads(nxt_unit_ctx_t *ctx, nxt_ruby_app_conf_t *c)
1484 {
1485 uint32_t i;
1486 nxt_ruby_ctx_t *rctx;
1487
1488 if (nxt_ruby_ctxs == NULL) {
1489 return;
1490 }
1491
1492 for (i = 0; i < c->threads - 1; i++) {
1493 rctx = &nxt_ruby_ctxs[i];
1494
1495 if (rctx->thread != Qnil) {
1496 rb_funcall(rctx->thread, rb_intern("join"), 0);
1497
1498 nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1));
1499
1500 } else {
1501 nxt_unit_debug(ctx, "thread #%d not started", (int) (i + 1));
1502 }
1503 }
1504
1505 for (i = 0; i < c->threads - 1; i++) {
1506 nxt_ruby_ctx_done(&nxt_ruby_ctxs[i]);
1507 }
1508
1509 nxt_unit_free(ctx, nxt_ruby_ctxs);
1510 }
1511