xref: /unit/src/ruby/nxt_ruby_stream_io.c (revision 2093:b82e9e5bf520)
1 
2 /*
3  * Copyright (C) Alexander Borisov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <ruby/nxt_ruby.h>
8 #include <nxt_unit.h>
9 
10 
11 static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE arg);
12 static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self);
13 static VALUE nxt_ruby_stream_io_gets(VALUE obj);
14 static VALUE nxt_ruby_stream_io_each(VALUE obj);
15 static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args);
16 static VALUE nxt_ruby_stream_io_rewind(VALUE obj);
17 static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args);
18 static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args);
19 nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val);
20 static VALUE nxt_ruby_stream_io_flush(VALUE obj);
21 static VALUE nxt_ruby_stream_io_close(VALUE obj);
22 
23 
24 VALUE
nxt_ruby_stream_io_input_init(void)25 nxt_ruby_stream_io_input_init(void)
26 {
27     VALUE  stream_io;
28 
29     stream_io = rb_define_class("NGINX_Unit_Stream_IO_Read", rb_cObject);
30 
31     rb_undef_alloc_func(stream_io);
32 
33     rb_gc_register_address(&stream_io);
34 
35     rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
36     rb_define_method(stream_io, "initialize",
37                      nxt_ruby_stream_io_initialize, -1);
38     rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0);
39     rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0);
40     rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2);
41     rb_define_method(stream_io, "rewind", nxt_ruby_stream_io_rewind, 0);
42     rb_define_method(stream_io, "close", nxt_ruby_stream_io_close, 0);
43 
44     return stream_io;
45 }
46 
47 
48 VALUE
nxt_ruby_stream_io_error_init(void)49 nxt_ruby_stream_io_error_init(void)
50 {
51     VALUE  stream_io;
52 
53     stream_io = rb_define_class("NGINX_Unit_Stream_IO_Error", rb_cObject);
54 
55     rb_undef_alloc_func(stream_io);
56 
57     rb_gc_register_address(&stream_io);
58 
59     rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
60     rb_define_method(stream_io, "initialize",
61                      nxt_ruby_stream_io_initialize, -1);
62     rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2);
63     rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2);
64     rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0);
65     rb_define_method(stream_io, "close", nxt_ruby_stream_io_close, 0);
66 
67     return stream_io;
68 }
69 
70 
71 static VALUE
nxt_ruby_stream_io_new(VALUE class,VALUE arg)72 nxt_ruby_stream_io_new(VALUE class, VALUE arg)
73 {
74     VALUE  self;
75 
76     self = Data_Wrap_Struct(class, 0, 0, (void *) (uintptr_t) arg);
77 
78     rb_obj_call_init(self, 0, NULL);
79 
80     return self;
81 }
82 
83 
84 static VALUE
nxt_ruby_stream_io_initialize(int argc,VALUE * argv,VALUE self)85 nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self)
86 {
87     return self;
88 }
89 
90 
91 static VALUE
nxt_ruby_stream_io_gets(VALUE obj)92 nxt_ruby_stream_io_gets(VALUE obj)
93 {
94     VALUE                    buf;
95     ssize_t                  res;
96     nxt_ruby_ctx_t           *rctx;
97     nxt_unit_request_info_t  *req;
98 
99     Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
100     req = rctx->req;
101 
102     if (req->content_length == 0) {
103         return Qnil;
104     }
105 
106     res = nxt_unit_request_readline_size(req, SSIZE_MAX);
107     if (nxt_slow_path(res < 0)) {
108         return Qnil;
109     }
110 
111     buf = rb_str_buf_new(res);
112 
113     if (nxt_slow_path(buf == Qnil)) {
114         return Qnil;
115     }
116 
117     res = nxt_unit_request_read(req, RSTRING_PTR(buf), res);
118 
119     rb_str_set_len(buf, res);
120 
121     return buf;
122 }
123 
124 
125 static VALUE
nxt_ruby_stream_io_each(VALUE obj)126 nxt_ruby_stream_io_each(VALUE obj)
127 {
128     VALUE  chunk;
129 
130     if (rb_block_given_p() == 0) {
131         rb_raise(rb_eArgError, "Expected block on rack.input 'each' method");
132     }
133 
134     for ( ;; ) {
135         chunk = nxt_ruby_stream_io_gets(obj);
136 
137         if (chunk == Qnil) {
138             return Qnil;
139         }
140 
141         rb_yield(chunk);
142     }
143 
144     return Qnil;
145 }
146 
147 
148 static VALUE
nxt_ruby_stream_io_read(VALUE obj,VALUE args)149 nxt_ruby_stream_io_read(VALUE obj, VALUE args)
150 {
151     VALUE           buf;
152     long            copy_size, u_size;
153     nxt_ruby_ctx_t  *rctx;
154 
155     Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
156 
157     copy_size = rctx->req->content_length;
158 
159     if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) {
160         u_size = NUM2LONG(RARRAY_PTR(args)[0]);
161 
162         if (u_size < 0 || copy_size == 0) {
163             return Qnil;
164         }
165 
166         if (copy_size > u_size) {
167             copy_size = u_size;
168         }
169     }
170 
171     if (copy_size == 0) {
172         return rb_str_new_cstr("");
173     }
174 
175     buf = rb_str_buf_new(copy_size);
176 
177     if (nxt_slow_path(buf == Qnil)) {
178         return Qnil;
179     }
180 
181     copy_size = nxt_unit_request_read(rctx->req, RSTRING_PTR(buf), copy_size);
182 
183     if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) {
184 
185         rb_str_set_len(RARRAY_PTR(args)[1], 0);
186         rb_str_cat(RARRAY_PTR(args)[1], RSTRING_PTR(buf), copy_size);
187     }
188 
189     rb_str_set_len(buf, copy_size);
190 
191     return buf;
192 }
193 
194 
195 static VALUE
nxt_ruby_stream_io_rewind(VALUE obj)196 nxt_ruby_stream_io_rewind(VALUE obj)
197 {
198     return Qnil;
199 }
200 
201 
202 static VALUE
nxt_ruby_stream_io_puts(VALUE obj,VALUE args)203 nxt_ruby_stream_io_puts(VALUE obj, VALUE args)
204 {
205     nxt_ruby_ctx_t  *rctx;
206 
207     if (RARRAY_LEN(args) != 1) {
208         return Qnil;
209     }
210 
211     Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
212 
213     nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]);
214 
215     return Qnil;
216 }
217 
218 
219 static VALUE
nxt_ruby_stream_io_write(VALUE obj,VALUE args)220 nxt_ruby_stream_io_write(VALUE obj, VALUE args)
221 {
222     long            len;
223     nxt_ruby_ctx_t  *rctx;
224 
225     if (RARRAY_LEN(args) != 1) {
226         return Qnil;
227     }
228 
229     Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
230 
231     len = nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]);
232 
233     return LONG2FIX(len);
234 }
235 
236 
237 nxt_inline long
nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t * rctx,VALUE val)238 nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val)
239 {
240     if (nxt_slow_path(val == Qnil)) {
241         return 0;
242     }
243 
244     if (TYPE(val) != T_STRING) {
245         val = rb_funcall(val, rb_intern("to_s"), 0);
246 
247         if (TYPE(val) != T_STRING) {
248             return 0;
249         }
250     }
251 
252     nxt_unit_req_error(rctx->req, "Ruby: %s", RSTRING_PTR(val));
253 
254     return RSTRING_LEN(val);
255 }
256 
257 
258 static VALUE
nxt_ruby_stream_io_flush(VALUE obj)259 nxt_ruby_stream_io_flush(VALUE obj)
260 {
261     return Qnil;
262 }
263 
264 
265 static VALUE
nxt_ruby_stream_io_close(VALUE obj)266 nxt_ruby_stream_io_close(VALUE obj)
267 {
268     return Qnil;
269 }
270