1
2 /*
3 * Copyright (C) Alexander Borisov
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <ruby/nxt_ruby.h>
8 #include <nxt_unit.h>
9
10
11 static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE arg);
12 static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self);
13 static VALUE nxt_ruby_stream_io_gets(VALUE obj);
14 static VALUE nxt_ruby_stream_io_each(VALUE obj);
15 static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args);
16 static VALUE nxt_ruby_stream_io_rewind(VALUE obj);
17 static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args);
18 static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args);
19 nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val);
20 static VALUE nxt_ruby_stream_io_flush(VALUE obj);
21 static VALUE nxt_ruby_stream_io_close(VALUE obj);
22
23
24 VALUE
nxt_ruby_stream_io_input_init(void)25 nxt_ruby_stream_io_input_init(void)
26 {
27 VALUE stream_io;
28
29 stream_io = rb_define_class("NGINX_Unit_Stream_IO_Read", rb_cObject);
30
31 rb_undef_alloc_func(stream_io);
32
33 rb_gc_register_address(&stream_io);
34
35 rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
36 rb_define_method(stream_io, "initialize",
37 nxt_ruby_stream_io_initialize, -1);
38 rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0);
39 rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0);
40 rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2);
41 rb_define_method(stream_io, "rewind", nxt_ruby_stream_io_rewind, 0);
42 rb_define_method(stream_io, "close", nxt_ruby_stream_io_close, 0);
43
44 return stream_io;
45 }
46
47
48 VALUE
nxt_ruby_stream_io_error_init(void)49 nxt_ruby_stream_io_error_init(void)
50 {
51 VALUE stream_io;
52
53 stream_io = rb_define_class("NGINX_Unit_Stream_IO_Error", rb_cObject);
54
55 rb_undef_alloc_func(stream_io);
56
57 rb_gc_register_address(&stream_io);
58
59 rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
60 rb_define_method(stream_io, "initialize",
61 nxt_ruby_stream_io_initialize, -1);
62 rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2);
63 rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2);
64 rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0);
65 rb_define_method(stream_io, "close", nxt_ruby_stream_io_close, 0);
66
67 return stream_io;
68 }
69
70
71 static VALUE
nxt_ruby_stream_io_new(VALUE class,VALUE arg)72 nxt_ruby_stream_io_new(VALUE class, VALUE arg)
73 {
74 VALUE self;
75
76 self = Data_Wrap_Struct(class, 0, 0, (void *) (uintptr_t) arg);
77
78 rb_obj_call_init(self, 0, NULL);
79
80 return self;
81 }
82
83
84 static VALUE
nxt_ruby_stream_io_initialize(int argc,VALUE * argv,VALUE self)85 nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self)
86 {
87 return self;
88 }
89
90
91 static VALUE
nxt_ruby_stream_io_gets(VALUE obj)92 nxt_ruby_stream_io_gets(VALUE obj)
93 {
94 VALUE buf;
95 ssize_t res;
96 nxt_ruby_ctx_t *rctx;
97 nxt_unit_request_info_t *req;
98
99 Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
100 req = rctx->req;
101
102 if (req->content_length == 0) {
103 return Qnil;
104 }
105
106 res = nxt_unit_request_readline_size(req, SSIZE_MAX);
107 if (nxt_slow_path(res < 0)) {
108 return Qnil;
109 }
110
111 buf = rb_str_buf_new(res);
112
113 if (nxt_slow_path(buf == Qnil)) {
114 return Qnil;
115 }
116
117 res = nxt_unit_request_read(req, RSTRING_PTR(buf), res);
118
119 rb_str_set_len(buf, res);
120
121 return buf;
122 }
123
124
125 static VALUE
nxt_ruby_stream_io_each(VALUE obj)126 nxt_ruby_stream_io_each(VALUE obj)
127 {
128 VALUE chunk;
129
130 if (rb_block_given_p() == 0) {
131 rb_raise(rb_eArgError, "Expected block on rack.input 'each' method");
132 }
133
134 for ( ;; ) {
135 chunk = nxt_ruby_stream_io_gets(obj);
136
137 if (chunk == Qnil) {
138 return Qnil;
139 }
140
141 rb_yield(chunk);
142 }
143
144 return Qnil;
145 }
146
147
148 static VALUE
nxt_ruby_stream_io_read(VALUE obj,VALUE args)149 nxt_ruby_stream_io_read(VALUE obj, VALUE args)
150 {
151 VALUE buf;
152 long copy_size, u_size;
153 nxt_ruby_ctx_t *rctx;
154
155 Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
156
157 copy_size = rctx->req->content_length;
158
159 if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) {
160 u_size = NUM2LONG(RARRAY_PTR(args)[0]);
161
162 if (u_size < 0 || copy_size == 0) {
163 return Qnil;
164 }
165
166 if (copy_size > u_size) {
167 copy_size = u_size;
168 }
169 }
170
171 if (copy_size == 0) {
172 return rb_str_new_cstr("");
173 }
174
175 buf = rb_str_buf_new(copy_size);
176
177 if (nxt_slow_path(buf == Qnil)) {
178 return Qnil;
179 }
180
181 copy_size = nxt_unit_request_read(rctx->req, RSTRING_PTR(buf), copy_size);
182
183 if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) {
184
185 rb_str_set_len(RARRAY_PTR(args)[1], 0);
186 rb_str_cat(RARRAY_PTR(args)[1], RSTRING_PTR(buf), copy_size);
187 }
188
189 rb_str_set_len(buf, copy_size);
190
191 return buf;
192 }
193
194
195 static VALUE
nxt_ruby_stream_io_rewind(VALUE obj)196 nxt_ruby_stream_io_rewind(VALUE obj)
197 {
198 return Qnil;
199 }
200
201
202 static VALUE
nxt_ruby_stream_io_puts(VALUE obj,VALUE args)203 nxt_ruby_stream_io_puts(VALUE obj, VALUE args)
204 {
205 nxt_ruby_ctx_t *rctx;
206
207 if (RARRAY_LEN(args) != 1) {
208 return Qnil;
209 }
210
211 Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
212
213 nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]);
214
215 return Qnil;
216 }
217
218
219 static VALUE
nxt_ruby_stream_io_write(VALUE obj,VALUE args)220 nxt_ruby_stream_io_write(VALUE obj, VALUE args)
221 {
222 long len;
223 nxt_ruby_ctx_t *rctx;
224
225 if (RARRAY_LEN(args) != 1) {
226 return Qnil;
227 }
228
229 Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
230
231 len = nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]);
232
233 return LONG2FIX(len);
234 }
235
236
237 nxt_inline long
nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t * rctx,VALUE val)238 nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val)
239 {
240 if (nxt_slow_path(val == Qnil)) {
241 return 0;
242 }
243
244 if (TYPE(val) != T_STRING) {
245 val = rb_funcall(val, rb_intern("to_s"), 0);
246
247 if (TYPE(val) != T_STRING) {
248 return 0;
249 }
250 }
251
252 nxt_unit_req_error(rctx->req, "Ruby: %s", RSTRING_PTR(val));
253
254 return RSTRING_LEN(val);
255 }
256
257
258 static VALUE
nxt_ruby_stream_io_flush(VALUE obj)259 nxt_ruby_stream_io_flush(VALUE obj)
260 {
261 return Qnil;
262 }
263
264
265 static VALUE
nxt_ruby_stream_io_close(VALUE obj)266 nxt_ruby_stream_io_close(VALUE obj)
267 {
268 return Qnil;
269 }
270