1 2 /* 3 * Copyright (C) Alexander Borisov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <ruby/nxt_ruby.h> 8 #include <nxt_unit.h> 9 10 11 static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE wrap); 12 static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self); 13 static VALUE nxt_ruby_stream_io_gets(VALUE obj, VALUE args); 14 static VALUE nxt_ruby_stream_io_each(VALUE obj, VALUE args); 15 static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args); 16 static VALUE nxt_ruby_stream_io_rewind(VALUE obj, VALUE args); 17 static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args); 18 static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args); 19 nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, 20 VALUE val); 21 static VALUE nxt_ruby_stream_io_flush(VALUE obj, VALUE args); 22 23 24 VALUE 25 nxt_ruby_stream_io_input_init(void) 26 { 27 VALUE stream_io; 28 29 stream_io = rb_define_class("NGINX_Unit_Stream_IO_Read", rb_cData); 30 31 rb_gc_register_address(&stream_io); 32 33 rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); 34 rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1); 35 rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0); 36 rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0); 37 rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2); 38 rb_define_method(stream_io, "rewind", nxt_ruby_stream_io_rewind, 0); 39 40 return stream_io; 41 } 42 43 44 VALUE 45 nxt_ruby_stream_io_error_init(void) 46 { 47 VALUE stream_io; 48 49 stream_io = rb_define_class("NGINX_Unit_Stream_IO_Error", rb_cData); 50 51 rb_gc_register_address(&stream_io); 52 53 rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); 54 rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1); 55 rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2); 56 rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2); 57 rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0); 58 59 return stream_io; 60 } 61 62 63 static VALUE 64 nxt_ruby_stream_io_new(VALUE class, VALUE wrap) 65 { 66 VALUE self; 67 nxt_ruby_run_ctx_t *run_ctx; 68 69 Data_Get_Struct(wrap, nxt_ruby_run_ctx_t, run_ctx); 70 self = Data_Wrap_Struct(class, 0, 0, run_ctx); 71 72 rb_obj_call_init(self, 0, NULL); 73 74 return self; 75 } 76 77 78 static VALUE 79 nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self) 80 { 81 return self; 82 } 83 84 85 static VALUE 86 nxt_ruby_stream_io_gets(VALUE obj, VALUE args) 87 { 88 VALUE buf; 89 char *p; 90 size_t size, b_size; 91 nxt_unit_buf_t *b; 92 nxt_ruby_run_ctx_t *run_ctx; 93 nxt_unit_request_info_t *req; 94 95 Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); 96 97 req = run_ctx->req; 98 99 if (req->content_length == 0) { 100 return Qnil; 101 } 102 103 size = 0; 104 105 for (b = req->content_buf; b; b = nxt_unit_buf_next(b)) { 106 b_size = b->end - b->free; 107 p = memchr(b->free, '\n', b_size); 108 109 if (p != NULL) { 110 p++; 111 size += p - b->free; 112 break; 113 } 114 115 size += b_size; 116 } 117 118 buf = rb_str_buf_new(size); 119 120 if (buf == Qnil) { 121 return Qnil; 122 } 123 124 size = nxt_unit_request_read(req, RSTRING_PTR(buf), size); 125 126 rb_str_set_len(buf, size); 127 128 return buf; 129 } 130 131 132 static VALUE 133 nxt_ruby_stream_io_each(VALUE obj, VALUE args) 134 { 135 VALUE chunk; 136 137 if (rb_block_given_p() == 0) { 138 rb_raise(rb_eArgError, "Expected block on rack.input 'each' method"); 139 } 140 141 for ( ;; ) { 142 chunk = nxt_ruby_stream_io_gets(obj, Qnil); 143 144 if (chunk == Qnil) { 145 return Qnil; 146 } 147 148 rb_yield(chunk); 149 } 150 151 return Qnil; 152 } 153 154 155 static VALUE 156 nxt_ruby_stream_io_read(VALUE obj, VALUE args) 157 { 158 VALUE buf; 159 long copy_size, u_size; 160 nxt_ruby_run_ctx_t *run_ctx; 161 162 Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); 163 164 copy_size = run_ctx->req->content_length; 165 166 if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) { 167 u_size = NUM2LONG(RARRAY_PTR(args)[0]); 168 169 if (u_size < 0 || copy_size == 0) { 170 return Qnil; 171 } 172 173 if (copy_size > u_size) { 174 copy_size = u_size; 175 } 176 } 177 178 if (copy_size == 0) { 179 return rb_str_new_cstr(""); 180 } 181 182 buf = rb_str_buf_new(copy_size); 183 184 if (nxt_slow_path(buf == Qnil)) { 185 return Qnil; 186 } 187 188 copy_size = nxt_unit_request_read(run_ctx->req, RSTRING_PTR(buf), 189 copy_size); 190 191 if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) { 192 193 rb_str_set_len(RARRAY_PTR(args)[1], 0); 194 rb_str_cat(RARRAY_PTR(args)[1], RSTRING_PTR(buf), copy_size); 195 } 196 197 rb_str_set_len(buf, copy_size); 198 199 return buf; 200 } 201 202 203 static VALUE 204 nxt_ruby_stream_io_rewind(VALUE obj, VALUE args) 205 { 206 return Qnil; 207 } 208 209 210 static VALUE 211 nxt_ruby_stream_io_puts(VALUE obj, VALUE args) 212 { 213 nxt_ruby_run_ctx_t *run_ctx; 214 215 if (RARRAY_LEN(args) != 1) { 216 return Qnil; 217 } 218 219 Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); 220 221 nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]); 222 223 return Qnil; 224 } 225 226 227 static VALUE 228 nxt_ruby_stream_io_write(VALUE obj, VALUE args) 229 { 230 long len; 231 nxt_ruby_run_ctx_t *run_ctx; 232 233 if (RARRAY_LEN(args) != 1) { 234 return Qnil; 235 } 236 237 Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); 238 239 len = nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]); 240 241 return LONG2FIX(len); 242 } 243 244 245 nxt_inline long 246 nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val) 247 { 248 if (nxt_slow_path(val == Qnil)) { 249 return 0; 250 } 251 252 if (TYPE(val) != T_STRING) { 253 val = rb_funcall(val, rb_intern("to_s"), 0); 254 255 if (TYPE(val) != T_STRING) { 256 return 0; 257 } 258 } 259 260 nxt_unit_req_error(run_ctx->req, "Ruby: %s", RSTRING_PTR(val)); 261 262 return RSTRING_LEN(val); 263 } 264 265 266 static VALUE 267 nxt_ruby_stream_io_flush(VALUE obj, VALUE args) 268 { 269 return Qnil; 270 } 271