xref: /unit/src/ruby/nxt_ruby_stream_io.c (revision 1398:05063d6eec8e)
1 
2 /*
3  * Copyright (C) Alexander Borisov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <ruby/nxt_ruby.h>
8 #include <nxt_unit.h>
9 
10 
11 static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE wrap);
12 static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self);
13 static VALUE nxt_ruby_stream_io_gets(VALUE obj);
14 static VALUE nxt_ruby_stream_io_each(VALUE obj);
15 static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args);
16 static VALUE nxt_ruby_stream_io_rewind(VALUE obj);
17 static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args);
18 static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args);
19 nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx,
20     VALUE val);
21 static VALUE nxt_ruby_stream_io_flush(VALUE obj);
22 
23 
24 VALUE
25 nxt_ruby_stream_io_input_init(void)
26 {
27     VALUE  stream_io;
28 
29     stream_io = rb_define_class("NGINX_Unit_Stream_IO_Read", rb_cData);
30 
31     rb_gc_register_address(&stream_io);
32 
33     rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
34     rb_define_method(stream_io, "initialize",
35                      nxt_ruby_stream_io_initialize, -1);
36     rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0);
37     rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0);
38     rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2);
39     rb_define_method(stream_io, "rewind", nxt_ruby_stream_io_rewind, 0);
40 
41     return stream_io;
42 }
43 
44 
45 VALUE
46 nxt_ruby_stream_io_error_init(void)
47 {
48     VALUE  stream_io;
49 
50     stream_io = rb_define_class("NGINX_Unit_Stream_IO_Error", rb_cData);
51 
52     rb_gc_register_address(&stream_io);
53 
54     rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
55     rb_define_method(stream_io, "initialize",
56                      nxt_ruby_stream_io_initialize, -1);
57     rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2);
58     rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2);
59     rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0);
60 
61     return stream_io;
62 }
63 
64 
65 static VALUE
66 nxt_ruby_stream_io_new(VALUE class, VALUE wrap)
67 {
68     VALUE               self;
69     nxt_ruby_run_ctx_t  *run_ctx;
70 
71     Data_Get_Struct(wrap, nxt_ruby_run_ctx_t, run_ctx);
72     self = Data_Wrap_Struct(class, 0, 0, run_ctx);
73 
74     rb_obj_call_init(self, 0, NULL);
75 
76     return self;
77 }
78 
79 
80 static VALUE
81 nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self)
82 {
83     return self;
84 }
85 
86 
87 static VALUE
88 nxt_ruby_stream_io_gets(VALUE obj)
89 {
90     VALUE                    buf;
91     ssize_t                  res;
92     nxt_ruby_run_ctx_t       *run_ctx;
93     nxt_unit_request_info_t  *req;
94 
95     Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
96 
97     req = run_ctx->req;
98 
99     if (req->content_length == 0) {
100         return Qnil;
101     }
102 
103     res = nxt_unit_request_readline_size(req, SSIZE_MAX);
104     if (nxt_slow_path(res < 0)) {
105         return Qnil;
106     }
107 
108     buf = rb_str_buf_new(res);
109 
110     if (nxt_slow_path(buf == Qnil)) {
111         return Qnil;
112     }
113 
114     res = nxt_unit_request_read(req, RSTRING_PTR(buf), res);
115 
116     rb_str_set_len(buf, res);
117 
118     return buf;
119 }
120 
121 
122 static VALUE
123 nxt_ruby_stream_io_each(VALUE obj)
124 {
125     VALUE  chunk;
126 
127     if (rb_block_given_p() == 0) {
128         rb_raise(rb_eArgError, "Expected block on rack.input 'each' method");
129     }
130 
131     for ( ;; ) {
132         chunk = nxt_ruby_stream_io_gets(obj);
133 
134         if (chunk == Qnil) {
135             return Qnil;
136         }
137 
138         rb_yield(chunk);
139     }
140 
141     return Qnil;
142 }
143 
144 
145 static VALUE
146 nxt_ruby_stream_io_read(VALUE obj, VALUE args)
147 {
148     VALUE                buf;
149     long                 copy_size, u_size;
150     nxt_ruby_run_ctx_t  *run_ctx;
151 
152     Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
153 
154     copy_size = run_ctx->req->content_length;
155 
156     if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) {
157         u_size = NUM2LONG(RARRAY_PTR(args)[0]);
158 
159         if (u_size < 0 || copy_size == 0) {
160             return Qnil;
161         }
162 
163         if (copy_size > u_size) {
164             copy_size = u_size;
165         }
166     }
167 
168     if (copy_size == 0) {
169         return rb_str_new_cstr("");
170     }
171 
172     buf = rb_str_buf_new(copy_size);
173 
174     if (nxt_slow_path(buf == Qnil)) {
175         return Qnil;
176     }
177 
178     copy_size = nxt_unit_request_read(run_ctx->req, RSTRING_PTR(buf),
179                                       copy_size);
180 
181     if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) {
182 
183         rb_str_set_len(RARRAY_PTR(args)[1], 0);
184         rb_str_cat(RARRAY_PTR(args)[1], RSTRING_PTR(buf), copy_size);
185     }
186 
187     rb_str_set_len(buf, copy_size);
188 
189     return buf;
190 }
191 
192 
193 static VALUE
194 nxt_ruby_stream_io_rewind(VALUE obj)
195 {
196     return Qnil;
197 }
198 
199 
200 static VALUE
201 nxt_ruby_stream_io_puts(VALUE obj, VALUE args)
202 {
203     nxt_ruby_run_ctx_t  *run_ctx;
204 
205     if (RARRAY_LEN(args) != 1) {
206         return Qnil;
207     }
208 
209     Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
210 
211     nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]);
212 
213     return Qnil;
214 }
215 
216 
217 static VALUE
218 nxt_ruby_stream_io_write(VALUE obj, VALUE args)
219 {
220     long                len;
221     nxt_ruby_run_ctx_t  *run_ctx;
222 
223     if (RARRAY_LEN(args) != 1) {
224         return Qnil;
225     }
226 
227     Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
228 
229     len = nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]);
230 
231     return LONG2FIX(len);
232 }
233 
234 
235 nxt_inline long
236 nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val)
237 {
238     if (nxt_slow_path(val == Qnil)) {
239         return 0;
240     }
241 
242     if (TYPE(val) != T_STRING) {
243         val = rb_funcall(val, rb_intern("to_s"), 0);
244 
245         if (TYPE(val) != T_STRING) {
246             return 0;
247         }
248     }
249 
250     nxt_unit_req_error(run_ctx->req, "Ruby: %s", RSTRING_PTR(val));
251 
252     return RSTRING_LEN(val);
253 }
254 
255 
256 static VALUE
257 nxt_ruby_stream_io_flush(VALUE obj)
258 {
259     return Qnil;
260 }
261