1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 #if (NXT_THREADS) 11 12 typedef struct { 13 nxt_job_t job; 14 nxt_buf_t *out; 15 size_t sent; 16 size_t limit; 17 nxt_work_handler_t ready_handler; 18 } nxt_job_sendfile_t; 19 20 21 static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, 22 void *data); 23 static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, 24 void *data); 25 static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, 26 void *data); 27 static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task, 28 nxt_event_conn_t *c, nxt_buf_t *b); 29 30 31 void 32 nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c) 33 { 34 nxt_event_fd_disable(task->thread->engine, &c->socket); 35 36 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */ 37 nxt_event_conn_job_sendfile_start(task, c, NULL); 38 } 39 40 41 static void 42 nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data) 43 { 44 nxt_iobuf_t b; 45 nxt_event_conn_t *c; 46 nxt_job_sendfile_t *jbs; 47 nxt_sendbuf_coalesce_t sb; 48 49 c = obj; 50 51 nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd); 52 53 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t)); 54 55 if (nxt_slow_path(jbs == NULL)) { 56 c->write_state->error_handler(task, c, NULL); 57 return; 58 } 59 60 c->socket.write_handler = nxt_event_conn_job_sendfile_start; 61 c->socket.error_handler = c->write_state->error_handler; 62 63 jbs->job.data = c; 64 nxt_job_set_name(&jbs->job, "job sendfile"); 65 66 jbs->limit = nxt_event_conn_write_limit(c); 67 68 if (jbs->limit != 0) { 69 70 sb.buf = c->write; 71 sb.iobuf = &b; 72 sb.nmax = 1; 73 sb.sync = 0; 74 sb.size = 0; 75 sb.limit = jbs->limit; 76 77 if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) { 78 79 jbs->job.thread_pool = c->u.thread_pool; 80 jbs->job.log = c->socket.log; 81 jbs->out = c->write; 82 c->write = NULL; 83 jbs->ready_handler = nxt_event_conn_job_sendfile_return; 84 85 c->blocked = 1; 86 87 if (c->write_timer.state != NXT_EVENT_TIMER_DISABLED) { 88 c->write_timer.state = NXT_EVENT_TIMER_BLOCKED; 89 } 90 91 nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler); 92 return; 93 } 94 } 95 96 nxt_event_conn_job_sendfile_return(task, jbs, c); 97 } 98 99 100 static void 101 nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data) 102 { 103 ssize_t ret; 104 nxt_buf_t *b; 105 nxt_bool_t first; 106 nxt_event_conn_t *c; 107 nxt_job_sendfile_t *jbs; 108 109 jbs = obj; 110 c = data; 111 112 nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd); 113 114 first = c->socket.write_ready; 115 b = jbs->out; 116 117 do { 118 ret = c->io->sendbuf(c, b, jbs->limit); 119 120 if (ret == NXT_AGAIN) { 121 break; 122 } 123 124 if (nxt_slow_path(ret == NXT_ERROR)) { 125 goto done; 126 } 127 128 jbs->sent += ret; 129 jbs->limit -= ret; 130 131 b = nxt_sendbuf_update(b, ret); 132 133 if (b == NULL) { 134 goto done; 135 } 136 137 if (jbs->limit == 0) { 138 139 if (c->rate == NULL) { 140 jbs->limit = c->max_chunk; 141 goto fast; 142 } 143 144 goto done; 145 } 146 147 } while (c->socket.write_ready); 148 149 if (first && task->thread->thread_pool->work_queue.head != NULL) { 150 goto fast; 151 } 152 153 done: 154 155 nxt_job_return(task, &jbs->job, jbs->ready_handler); 156 return; 157 158 fast: 159 160 nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler, 161 jbs->job.task, jbs, c); 162 163 nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work); 164 } 165 166 167 static void 168 nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) 169 { 170 size_t sent; 171 nxt_buf_t *b; 172 nxt_bool_t done; 173 nxt_event_conn_t *c; 174 nxt_job_sendfile_t *jbs; 175 176 jbs = obj; 177 c = data; 178 179 c->blocked = 0; 180 181 sent = jbs->sent; 182 c->sent += sent; 183 184 nxt_debug(task, "event conn sendfile sent:%z", sent); 185 186 b = jbs->out; 187 188 /* The job must be destroyed before connection error handler. */ 189 nxt_job_destroy(jbs); 190 191 if (c->write_state->process_buffers) { 192 b = nxt_event_conn_job_sendfile_completion(task, c, b); 193 194 done = (b == NULL); 195 196 /* Add data which might be added after sendfile job has started. */ 197 nxt_buf_chain_add(&b, c->write); 198 c->write = b; 199 200 if (done) { 201 /* All data has been sent. */ 202 203 if (b != NULL) { 204 /* But new data has been added. */ 205 nxt_event_conn_job_sendfile_start(task, c, NULL); 206 } 207 208 return; 209 } 210 } 211 212 if (sent != 0 && c->write_state->autoreset_timer) { 213 nxt_event_timer_disable(&c->write_timer); 214 215 } else if (c->write_timer.state == NXT_EVENT_TIMER_BLOCKED) { 216 c->write_timer.state = NXT_EVENT_TIMER_ACTIVE; 217 } 218 219 if (c->socket.error == 0 220 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent)) 221 { 222 nxt_event_conn_timer(task->thread->engine, c, c->write_state, 223 &c->write_timer); 224 225 nxt_event_fd_oneshot_write(task->thread->engine, &c->socket); 226 } 227 228 if (sent != 0) { 229 nxt_event_conn_io_handle(task->thread, c->write_work_queue, 230 c->write_state->ready_handler, 231 task, c, c->socket.data); 232 /* 233 * Fall through if first operations were 234 * successful but the last one failed. 235 */ 236 } 237 238 if (nxt_slow_path(c->socket.error != 0)) { 239 nxt_event_conn_io_handle(task->thread, c->write_work_queue, 240 c->write_state->error_handler, 241 task, c, c->socket.data); 242 } 243 } 244 245 246 static nxt_buf_t * 247 nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c, 248 nxt_buf_t *b) 249 { 250 while (b != NULL) { 251 252 nxt_prefetch(b->next); 253 254 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) { 255 break; 256 257 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) { 258 break; 259 } 260 261 nxt_work_queue_add(c->write_work_queue, 262 b->completion_handler, task, b, b->parent); 263 264 b = b->next; 265 } 266 267 return b; 268 } 269 270 #endif 271