xref: /unit/src/ruby/nxt_ruby_stream_io.c (revision 743:e0f0cd7d244a)
1 
2 /*
3  * Copyright (C) Alexander Borisov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <ruby/nxt_ruby.h>
8 #include <nxt_unit.h>
9 
10 
11 static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE wrap);
12 static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self);
13 static VALUE nxt_ruby_stream_io_gets(VALUE obj, VALUE args);
14 static VALUE nxt_ruby_stream_io_each(VALUE obj, VALUE args);
15 static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args);
16 static VALUE nxt_ruby_stream_io_rewind(VALUE obj, VALUE args);
17 static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args);
18 static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args);
19 nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx,
20     VALUE val);
21 static VALUE nxt_ruby_stream_io_flush(VALUE obj, VALUE args);
22 
23 
24 VALUE
25 nxt_ruby_stream_io_input_init(void)
26 {
27     VALUE  stream_io;
28 
29     stream_io = rb_define_class("NGINX_Unit_Stream_IO_Read", rb_cData);
30 
31     rb_gc_register_address(&stream_io);
32 
33     rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
34     rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1);
35     rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0);
36     rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0);
37     rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2);
38     rb_define_method(stream_io, "rewind", nxt_ruby_stream_io_rewind, 0);
39 
40     return stream_io;
41 }
42 
43 
44 VALUE
45 nxt_ruby_stream_io_error_init(void)
46 {
47     VALUE  stream_io;
48 
49     stream_io = rb_define_class("NGINX_Unit_Stream_IO_Error", rb_cData);
50 
51     rb_gc_register_address(&stream_io);
52 
53     rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
54     rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1);
55     rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2);
56     rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2);
57     rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0);
58 
59     return stream_io;
60 }
61 
62 
63 static VALUE
64 nxt_ruby_stream_io_new(VALUE class, VALUE wrap)
65 {
66     VALUE               self;
67     nxt_ruby_run_ctx_t  *run_ctx;
68 
69     Data_Get_Struct(wrap, nxt_ruby_run_ctx_t, run_ctx);
70     self = Data_Wrap_Struct(class, 0, 0, run_ctx);
71 
72     rb_obj_call_init(self, 0, NULL);
73 
74     return self;
75 }
76 
77 
78 static VALUE
79 nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self)
80 {
81     return self;
82 }
83 
84 
85 static VALUE
86 nxt_ruby_stream_io_gets(VALUE obj, VALUE args)
87 {
88     VALUE                    buf;
89     char                     *p;
90     size_t                   size, b_size;
91     nxt_unit_buf_t           *b;
92     nxt_ruby_run_ctx_t       *run_ctx;
93     nxt_unit_request_info_t  *req;
94 
95     Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
96 
97     req = run_ctx->req;
98 
99     if (req->content_length == 0) {
100         return Qnil;
101     }
102 
103     size = 0;
104 
105     for (b = req->content_buf; b; b = nxt_unit_buf_next(b)) {
106         b_size = b->end - b->free;
107         p = memchr(b->free, '\n', b_size);
108 
109         if (p != NULL) {
110             p++;
111             size += p - b->free;
112             break;
113         }
114 
115         size += b_size;
116     }
117 
118     buf = rb_str_buf_new(size);
119 
120     if (buf == Qnil) {
121         return Qnil;
122     }
123 
124     size = nxt_unit_request_read(req, RSTRING_PTR(buf), size);
125 
126     rb_str_set_len(buf, size);
127 
128     return buf;
129 }
130 
131 
132 static VALUE
133 nxt_ruby_stream_io_each(VALUE obj, VALUE args)
134 {
135     VALUE  chunk;
136 
137     if (rb_block_given_p() == 0) {
138         rb_raise(rb_eArgError, "Expected block on rack.input 'each' method");
139     }
140 
141     for ( ;; ) {
142         chunk = nxt_ruby_stream_io_gets(obj, Qnil);
143 
144         if (chunk == Qnil) {
145             return Qnil;
146         }
147 
148         rb_yield(chunk);
149     }
150 
151     return Qnil;
152 }
153 
154 
155 static VALUE
156 nxt_ruby_stream_io_read(VALUE obj, VALUE args)
157 {
158     VALUE                buf;
159     long                 copy_size, u_size;
160     nxt_ruby_run_ctx_t  *run_ctx;
161 
162     Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
163 
164     copy_size = run_ctx->req->content_length;
165 
166     if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) {
167         u_size = NUM2LONG(RARRAY_PTR(args)[0]);
168 
169         if (u_size < 0 || copy_size == 0) {
170             return Qnil;
171         }
172 
173         if (copy_size > u_size) {
174             copy_size = u_size;
175         }
176     }
177 
178     if (copy_size == 0) {
179         return rb_str_new_cstr("");
180     }
181 
182     buf = rb_str_buf_new(copy_size);
183 
184     if (nxt_slow_path(buf == Qnil)) {
185         return Qnil;
186     }
187 
188     copy_size = nxt_unit_request_read(run_ctx->req, RSTRING_PTR(buf),
189                                       copy_size);
190 
191     if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) {
192 
193         rb_str_set_len(RARRAY_PTR(args)[1], 0);
194         rb_str_cat(RARRAY_PTR(args)[1], RSTRING_PTR(buf), copy_size);
195     }
196 
197     rb_str_set_len(buf, copy_size);
198 
199     return buf;
200 }
201 
202 
203 static VALUE
204 nxt_ruby_stream_io_rewind(VALUE obj, VALUE args)
205 {
206     return Qnil;
207 }
208 
209 
210 static VALUE
211 nxt_ruby_stream_io_puts(VALUE obj, VALUE args)
212 {
213     nxt_ruby_run_ctx_t  *run_ctx;
214 
215     if (RARRAY_LEN(args) != 1) {
216         return Qnil;
217     }
218 
219     Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
220 
221     nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]);
222 
223     return Qnil;
224 }
225 
226 
227 static VALUE
228 nxt_ruby_stream_io_write(VALUE obj, VALUE args)
229 {
230     long                len;
231     nxt_ruby_run_ctx_t  *run_ctx;
232 
233     if (RARRAY_LEN(args) != 1) {
234         return Qnil;
235     }
236 
237     Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
238 
239     len = nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]);
240 
241     return LONG2FIX(len);
242 }
243 
244 
245 nxt_inline long
246 nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val)
247 {
248     if (nxt_slow_path(val == Qnil)) {
249         return 0;
250     }
251 
252     if (TYPE(val) != T_STRING) {
253         val = rb_funcall(val, rb_intern("to_s"), 0);
254 
255         if (TYPE(val) != T_STRING) {
256             return 0;
257         }
258     }
259 
260     nxt_unit_req_error(run_ctx->req, "Ruby: %s", RSTRING_PTR(val));
261 
262     return RSTRING_LEN(val);
263 }
264 
265 
266 static VALUE
267 nxt_ruby_stream_io_flush(VALUE obj, VALUE args)
268 {
269     return Qnil;
270 }
271