1 2 /* 3 * Copyright (C) Alexander Borisov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <ruby/nxt_ruby.h> 8 #include <nxt_unit.h> 9 10 11 static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE wrap); 12 static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self); 13 static VALUE nxt_ruby_stream_io_gets(VALUE obj); 14 static VALUE nxt_ruby_stream_io_each(VALUE obj); 15 static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args); 16 static VALUE nxt_ruby_stream_io_rewind(VALUE obj); 17 static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args); 18 static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args); 19 nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, 20 VALUE val); 21 static VALUE nxt_ruby_stream_io_flush(VALUE obj); 22 23 24 VALUE 25 nxt_ruby_stream_io_input_init(void) 26 { 27 VALUE stream_io; 28 29 stream_io = rb_define_class("NGINX_Unit_Stream_IO_Read", rb_cData); 30 31 rb_gc_register_address(&stream_io); 32 33 rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); 34 rb_define_method(stream_io, "initialize", 35 nxt_ruby_stream_io_initialize, -1); 36 rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0); 37 rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0); 38 rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2); 39 rb_define_method(stream_io, "rewind", nxt_ruby_stream_io_rewind, 0); 40 41 return stream_io; 42 } 43 44 45 VALUE 46 nxt_ruby_stream_io_error_init(void) 47 { 48 VALUE stream_io; 49 50 stream_io = rb_define_class("NGINX_Unit_Stream_IO_Error", rb_cData); 51 52 rb_gc_register_address(&stream_io); 53 54 rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); 55 rb_define_method(stream_io, "initialize", 56 nxt_ruby_stream_io_initialize, -1); 57 rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2); 58 rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2); 59 rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0); 60 61 return stream_io; 62 } 63 64 65 static VALUE 66 nxt_ruby_stream_io_new(VALUE class, VALUE wrap) 67 { 68 VALUE self; 69 nxt_ruby_run_ctx_t *run_ctx; 70 71 Data_Get_Struct(wrap, nxt_ruby_run_ctx_t, run_ctx); 72 self = Data_Wrap_Struct(class, 0, 0, run_ctx); 73 74 rb_obj_call_init(self, 0, NULL); 75 76 return self; 77 } 78 79 80 static VALUE 81 nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self) 82 { 83 return self; 84 } 85 86 87 static VALUE 88 nxt_ruby_stream_io_gets(VALUE obj) 89 { 90 VALUE buf; 91 ssize_t res; 92 nxt_ruby_run_ctx_t *run_ctx; 93 nxt_unit_request_info_t *req; 94 95 Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); 96 97 req = run_ctx->req; 98 99 if (req->content_length == 0) { 100 return Qnil; 101 } 102 103 res = nxt_unit_request_readline_size(req, SSIZE_MAX); 104 if (nxt_slow_path(res < 0)) { 105 return Qnil; 106 } 107 108 buf = rb_str_buf_new(res); 109 110 if (nxt_slow_path(buf == Qnil)) { 111 return Qnil; 112 } 113 114 res = nxt_unit_request_read(req, RSTRING_PTR(buf), res); 115 116 rb_str_set_len(buf, res); 117 118 return buf; 119 } 120 121 122 static VALUE 123 nxt_ruby_stream_io_each(VALUE obj) 124 { 125 VALUE chunk; 126 127 if (rb_block_given_p() == 0) { 128 rb_raise(rb_eArgError, "Expected block on rack.input 'each' method"); 129 } 130 131 for ( ;; ) { 132 chunk = nxt_ruby_stream_io_gets(obj); 133 134 if (chunk == Qnil) { 135 return Qnil; 136 } 137 138 rb_yield(chunk); 139 } 140 141 return Qnil; 142 } 143 144 145 static VALUE 146 nxt_ruby_stream_io_read(VALUE obj, VALUE args) 147 { 148 VALUE buf; 149 long copy_size, u_size; 150 nxt_ruby_run_ctx_t *run_ctx; 151 152 Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); 153 154 copy_size = run_ctx->req->content_length; 155 156 if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) { 157 u_size = NUM2LONG(RARRAY_PTR(args)[0]); 158 159 if (u_size < 0 || copy_size == 0) { 160 return Qnil; 161 } 162 163 if (copy_size > u_size) { 164 copy_size = u_size; 165 } 166 } 167 168 if (copy_size == 0) { 169 return rb_str_new_cstr(""); 170 } 171 172 buf = rb_str_buf_new(copy_size); 173 174 if (nxt_slow_path(buf == Qnil)) { 175 return Qnil; 176 } 177 178 copy_size = nxt_unit_request_read(run_ctx->req, RSTRING_PTR(buf), 179 copy_size); 180 181 if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) { 182 183 rb_str_set_len(RARRAY_PTR(args)[1], 0); 184 rb_str_cat(RARRAY_PTR(args)[1], RSTRING_PTR(buf), copy_size); 185 } 186 187 rb_str_set_len(buf, copy_size); 188 189 return buf; 190 } 191 192 193 static VALUE 194 nxt_ruby_stream_io_rewind(VALUE obj) 195 { 196 return Qnil; 197 } 198 199 200 static VALUE 201 nxt_ruby_stream_io_puts(VALUE obj, VALUE args) 202 { 203 nxt_ruby_run_ctx_t *run_ctx; 204 205 if (RARRAY_LEN(args) != 1) { 206 return Qnil; 207 } 208 209 Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); 210 211 nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]); 212 213 return Qnil; 214 } 215 216 217 static VALUE 218 nxt_ruby_stream_io_write(VALUE obj, VALUE args) 219 { 220 long len; 221 nxt_ruby_run_ctx_t *run_ctx; 222 223 if (RARRAY_LEN(args) != 1) { 224 return Qnil; 225 } 226 227 Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); 228 229 len = nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]); 230 231 return LONG2FIX(len); 232 } 233 234 235 nxt_inline long 236 nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val) 237 { 238 if (nxt_slow_path(val == Qnil)) { 239 return 0; 240 } 241 242 if (TYPE(val) != T_STRING) { 243 val = rb_funcall(val, rb_intern("to_s"), 0); 244 245 if (TYPE(val) != T_STRING) { 246 return 0; 247 } 248 } 249 250 nxt_unit_req_error(run_ctx->req, "Ruby: %s", RSTRING_PTR(val)); 251 252 return RSTRING_LEN(val); 253 } 254 255 256 static VALUE 257 nxt_ruby_stream_io_flush(VALUE obj) 258 { 259 return Qnil; 260 } 261