1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 #if (NXT_THREADS) 11 12 typedef struct { 13 nxt_job_t job; 14 nxt_buf_t *out; 15 size_t sent; 16 size_t limit; 17 nxt_work_handler_t ready_handler; 18 } nxt_job_sendfile_t; 19 20 21 static void nxt_event_conn_job_sendfile_start(nxt_thread_t *thr, void *obj, 22 void *data); 23 static void nxt_event_conn_job_sendfile_handler(nxt_thread_t *thr, void *obj, 24 void *data); 25 static void nxt_event_conn_job_sendfile_return(nxt_thread_t *thr, void *obj, 26 void *data); 27 static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_thread_t *thr, 28 nxt_event_conn_t *c, nxt_buf_t *b); 29 30 31 void 32 nxt_event_conn_job_sendfile(nxt_thread_t *thr, nxt_event_conn_t *c) 33 { 34 nxt_event_fd_disable(thr->engine, &c->socket); 35 36 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */ 37 nxt_event_conn_job_sendfile_start(thr, c, NULL); 38 } 39 40 41 static void 42 nxt_event_conn_job_sendfile_start(nxt_thread_t *thr, void *obj, void *data) 43 { 44 nxt_iobuf_t b; 45 nxt_event_conn_t *c; 46 nxt_job_sendfile_t *jbs; 47 nxt_sendbuf_coalesce_t sb; 48 49 c = obj; 50 51 nxt_log_debug(thr->log, "event conn sendfile fd:%d", c->socket.fd); 52 53 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t)); 54 55 if (nxt_slow_path(jbs == NULL)) { 56 c->write_state->error_handler(thr, c, NULL); 57 return; 58 } 59 60 c->socket.write_handler = nxt_event_conn_job_sendfile_start; 61 c->socket.error_handler = c->write_state->error_handler; 62 63 jbs->job.data = c; 64 nxt_job_set_name(&jbs->job, "job sendfile"); 65 66 jbs->limit = nxt_event_conn_write_limit(c); 67 68 if (jbs->limit != 0) { 69 70 sb.buf = c->write; 71 sb.iobuf = &b; 72 sb.nmax = 1; 73 sb.sync = 0; 74 sb.size = 0; 75 sb.limit = jbs->limit; 76 77 if (nxt_sendbuf_mem_coalesce(&sb) != 0 || !sb.sync) { 78 79 jbs->job.thread_pool = c->u.thread_pool; 80 jbs->job.log = c->socket.log; 81 jbs->out = c->write; 82 c->write = NULL; 83 jbs->ready_handler = nxt_event_conn_job_sendfile_return; 84 85 c->blocked = 1; 86 87 if (c->write_timer.state != NXT_EVENT_TIMER_DISABLED) { 88 c->write_timer.state = NXT_EVENT_TIMER_BLOCKED; 89 } 90 91 nxt_job_start(thr, &jbs->job, nxt_event_conn_job_sendfile_handler); 92 return; 93 } 94 } 95 96 nxt_event_conn_job_sendfile_return(thr, jbs, c); 97 } 98 99 100 static void 101 nxt_event_conn_job_sendfile_handler(nxt_thread_t *thr, void *obj, void *data) 102 { 103 ssize_t ret; 104 nxt_buf_t *b; 105 nxt_bool_t first; 106 nxt_event_conn_t *c; 107 nxt_job_sendfile_t *jbs; 108 109 jbs = obj; 110 c = data; 111 112 nxt_log_debug(thr->log, "event conn job sendfile fd:%d", c->socket.fd); 113 114 first = c->socket.write_ready; 115 b = jbs->out; 116 117 do { 118 ret = c->io->sendbuf(c, b, jbs->limit); 119 120 if (ret == NXT_AGAIN) { 121 break; 122 } 123 124 if (nxt_slow_path(ret == NXT_ERROR)) { 125 goto done; 126 } 127 128 jbs->sent += ret; 129 jbs->limit -= ret; 130 131 b = nxt_sendbuf_update(b, ret); 132 133 if (b == NULL) { 134 goto done; 135 } 136 137 if (jbs->limit == 0) { 138 139 if (c->rate == NULL) { 140 jbs->limit = c->max_chunk; 141 goto fast; 142 } 143 144 goto done; 145 } 146 147 } while (c->socket.write_ready); 148 149 if (first && thr->thread_pool->work_queue.head != NULL) { 150 goto fast; 151 } 152 153 done: 154 155 nxt_job_return(thr, &jbs->job, jbs->ready_handler); 156 return; 157 158 fast: 159 160 nxt_thread_pool_post(thr->thread_pool, nxt_event_conn_job_sendfile_handler, 161 jbs, c, thr->log); 162 } 163 164 165 static void 166 nxt_event_conn_job_sendfile_return(nxt_thread_t *thr, void *obj, void *data) 167 { 168 size_t sent; 169 nxt_buf_t *b; 170 nxt_bool_t done; 171 nxt_event_conn_t *c; 172 nxt_job_sendfile_t *jbs; 173 174 jbs = obj; 175 c = data; 176 177 c->blocked = 0; 178 179 sent = jbs->sent; 180 c->sent += sent; 181 182 nxt_log_debug(thr->log, "event conn sendfile sent:%z", sent); 183 184 b = jbs->out; 185 186 /* The job must be destroyed before connection error handler. */ 187 nxt_job_destroy(jbs); 188 189 if (c->write_state->process_buffers) { 190 b = nxt_event_conn_job_sendfile_completion(thr, c, b); 191 192 done = (b == NULL); 193 194 /* Add data which might be added after sendfile job has started. */ 195 nxt_buf_chain_add(&b, c->write); 196 c->write = b; 197 198 if (done) { 199 /* All data has been sent. */ 200 201 if (b != NULL) { 202 /* But new data has been added. */ 203 nxt_event_conn_job_sendfile_start(thr, c, NULL); 204 } 205 206 return; 207 } 208 } 209 210 if (sent != 0 && c->write_state->autoreset_timer) { 211 nxt_event_timer_disable(&c->write_timer); 212 213 } else if (c->write_timer.state == NXT_EVENT_TIMER_BLOCKED) { 214 c->write_timer.state = NXT_EVENT_TIMER_ACTIVE; 215 } 216 217 if (c->socket.error == 0 218 && !nxt_event_conn_write_delayed(thr->engine, c, sent)) 219 { 220 nxt_event_conn_timer(thr->engine, c, c->write_state, &c->write_timer); 221 222 nxt_event_fd_oneshot_write(thr->engine, &c->socket); 223 } 224 225 if (sent != 0) { 226 nxt_event_conn_io_handle(thr, c->write_work_queue, 227 c->write_state->ready_handler, 228 c, c->socket.data); 229 /* 230 * Fall through if first operations were 231 * successful but the last one failed. 232 */ 233 } 234 235 if (nxt_slow_path(c->socket.error != 0)) { 236 nxt_event_conn_io_handle(thr, c->write_work_queue, 237 c->write_state->error_handler, 238 c, c->socket.data); 239 } 240 } 241 242 243 static nxt_buf_t * 244 nxt_event_conn_job_sendfile_completion(nxt_thread_t *thr, nxt_event_conn_t *c, 245 nxt_buf_t *b) 246 { 247 while (b != NULL) { 248 249 nxt_prefetch(b->next); 250 251 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) { 252 break; 253 254 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) { 255 break; 256 } 257 258 nxt_thread_work_queue_add(thr, c->write_work_queue, 259 b->completion_handler, 260 b, b->parent, thr->log); 261 262 b = b->next; 263 } 264 265 return b; 266 } 267 268 #endif 269