1 2 /* 3 * Copyright (C) Alexander Borisov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <ruby/nxt_ruby.h> 8 #include <nxt_unit.h> 9 10 11 static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE arg); 12 static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self); 13 static VALUE nxt_ruby_stream_io_gets(VALUE obj); 14 static VALUE nxt_ruby_stream_io_each(VALUE obj); 15 static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args); 16 static VALUE nxt_ruby_stream_io_rewind(VALUE obj); 17 static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args); 18 static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args); 19 nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val); 20 static VALUE nxt_ruby_stream_io_flush(VALUE obj); 21 22 23 VALUE 24 nxt_ruby_stream_io_input_init(void) 25 { 26 VALUE stream_io; 27 28 stream_io = rb_define_class("NGINX_Unit_Stream_IO_Read", rb_cObject); 29 30 rb_undef_alloc_func(stream_io); 31 32 rb_gc_register_address(&stream_io); 33 34 rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); 35 rb_define_method(stream_io, "initialize", 36 nxt_ruby_stream_io_initialize, -1); 37 rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0); 38 rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0); 39 rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2); 40 rb_define_method(stream_io, "rewind", nxt_ruby_stream_io_rewind, 0); 41 42 return stream_io; 43 } 44 45 46 VALUE 47 nxt_ruby_stream_io_error_init(void) 48 { 49 VALUE stream_io; 50 51 stream_io = rb_define_class("NGINX_Unit_Stream_IO_Error", rb_cObject); 52 53 rb_undef_alloc_func(stream_io); 54 55 rb_gc_register_address(&stream_io); 56 57 rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); 58 rb_define_method(stream_io, "initialize", 59 nxt_ruby_stream_io_initialize, -1); 60 rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2); 61 rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2); 62 rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0); 63 64 return stream_io; 65 } 66 67 68 static VALUE 69 nxt_ruby_stream_io_new(VALUE class, VALUE arg) 70 { 71 VALUE self; 72 73 self = Data_Wrap_Struct(class, 0, 0, (void *) (uintptr_t) arg); 74 75 rb_obj_call_init(self, 0, NULL); 76 77 return self; 78 } 79 80 81 static VALUE 82 nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self) 83 { 84 return self; 85 } 86 87 88 static VALUE 89 nxt_ruby_stream_io_gets(VALUE obj) 90 { 91 VALUE buf; 92 ssize_t res; 93 nxt_ruby_ctx_t *rctx; 94 nxt_unit_request_info_t *req; 95 96 Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx); 97 req = rctx->req; 98 99 if (req->content_length == 0) { 100 return Qnil; 101 } 102 103 res = nxt_unit_request_readline_size(req, SSIZE_MAX); 104 if (nxt_slow_path(res < 0)) { 105 return Qnil; 106 } 107 108 buf = rb_str_buf_new(res); 109 110 if (nxt_slow_path(buf == Qnil)) { 111 return Qnil; 112 } 113 114 res = nxt_unit_request_read(req, RSTRING_PTR(buf), res); 115 116 rb_str_set_len(buf, res); 117 118 return buf; 119 } 120 121 122 static VALUE 123 nxt_ruby_stream_io_each(VALUE obj) 124 { 125 VALUE chunk; 126 127 if (rb_block_given_p() == 0) { 128 rb_raise(rb_eArgError, "Expected block on rack.input 'each' method"); 129 } 130 131 for ( ;; ) { 132 chunk = nxt_ruby_stream_io_gets(obj); 133 134 if (chunk == Qnil) { 135 return Qnil; 136 } 137 138 rb_yield(chunk); 139 } 140 141 return Qnil; 142 } 143 144 145 static VALUE 146 nxt_ruby_stream_io_read(VALUE obj, VALUE args) 147 { 148 VALUE buf; 149 long copy_size, u_size; 150 nxt_ruby_ctx_t *rctx; 151 152 Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx); 153 154 copy_size = rctx->req->content_length; 155 156 if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) { 157 u_size = NUM2LONG(RARRAY_PTR(args)[0]); 158 159 if (u_size < 0 || copy_size == 0) { 160 return Qnil; 161 } 162 163 if (copy_size > u_size) { 164 copy_size = u_size; 165 } 166 } 167 168 if (copy_size == 0) { 169 return rb_str_new_cstr(""); 170 } 171 172 buf = rb_str_buf_new(copy_size); 173 174 if (nxt_slow_path(buf == Qnil)) { 175 return Qnil; 176 } 177 178 copy_size = nxt_unit_request_read(rctx->req, RSTRING_PTR(buf), copy_size); 179 180 if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) { 181 182 rb_str_set_len(RARRAY_PTR(args)[1], 0); 183 rb_str_cat(RARRAY_PTR(args)[1], RSTRING_PTR(buf), copy_size); 184 } 185 186 rb_str_set_len(buf, copy_size); 187 188 return buf; 189 } 190 191 192 static VALUE 193 nxt_ruby_stream_io_rewind(VALUE obj) 194 { 195 return Qnil; 196 } 197 198 199 static VALUE 200 nxt_ruby_stream_io_puts(VALUE obj, VALUE args) 201 { 202 nxt_ruby_ctx_t *rctx; 203 204 if (RARRAY_LEN(args) != 1) { 205 return Qnil; 206 } 207 208 Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx); 209 210 nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]); 211 212 return Qnil; 213 } 214 215 216 static VALUE 217 nxt_ruby_stream_io_write(VALUE obj, VALUE args) 218 { 219 long len; 220 nxt_ruby_ctx_t *rctx; 221 222 if (RARRAY_LEN(args) != 1) { 223 return Qnil; 224 } 225 226 Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx); 227 228 len = nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]); 229 230 return LONG2FIX(len); 231 } 232 233 234 nxt_inline long 235 nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val) 236 { 237 if (nxt_slow_path(val == Qnil)) { 238 return 0; 239 } 240 241 if (TYPE(val) != T_STRING) { 242 val = rb_funcall(val, rb_intern("to_s"), 0); 243 244 if (TYPE(val) != T_STRING) { 245 return 0; 246 } 247 } 248 249 nxt_unit_req_error(rctx->req, "Ruby: %s", RSTRING_PTR(val)); 250 251 return RSTRING_LEN(val); 252 } 253 254 255 static VALUE 256 nxt_ruby_stream_io_flush(VALUE obj) 257 { 258 return Qnil; 259 } 260