1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 typedef struct { 11 nxt_job_t job; 12 nxt_buf_t *out; 13 size_t sent; 14 size_t limit; 15 nxt_work_handler_t ready_handler; 16 } nxt_job_sendfile_t; 17 18 19 static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, 20 void *data); 21 static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, 22 void *data); 23 static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, 24 void *data); 25 static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task, 26 nxt_conn_t *c, nxt_buf_t *b); 27 28 29 void 30 nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_conn_t *c) 31 { 32 nxt_fd_event_disable(task->thread->engine, &c->socket); 33 34 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */ 35 nxt_event_conn_job_sendfile_start(task, c, NULL); 36 } 37 38 39 static void 40 nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data) 41 { 42 nxt_conn_t *c; 43 nxt_iobuf_t b; 44 nxt_job_sendfile_t *jbs; 45 nxt_sendbuf_coalesce_t sb; 46 47 c = obj; 48 49 nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd); 50 51 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t)); 52 53 if (nxt_slow_path(jbs == NULL)) { 54 c->write_state->error_handler(task, c, NULL); 55 return; 56 } 57 58 c->socket.write_handler = nxt_event_conn_job_sendfile_start; 59 c->socket.error_handler = c->write_state->error_handler; 60 61 jbs->job.data = c; 62 nxt_job_set_name(&jbs->job, "job sendfile"); 63 64 jbs->limit = nxt_event_conn_write_limit(c); 65 66 if (jbs->limit != 0) { 67 68 sb.buf = c->write; 69 sb.iobuf = &b; 70 sb.nmax = 1; 71 sb.sync = 0; 72 sb.size = 0; 73 sb.limit = jbs->limit; 74 75 if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) { 76 77 jbs->job.thread_pool = c->u.thread_pool; 78 jbs->job.log = c->socket.log; 79 jbs->out = c->write; 80 c->write = NULL; 81 jbs->ready_handler = nxt_event_conn_job_sendfile_return; 82 83 c->blocked = 1; 84 85 nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler); 86 return; 87 } 88 } 89 90 nxt_event_conn_job_sendfile_return(task, jbs, c); 91 } 92 93 94 static void 95 nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data) 96 { 97 ssize_t ret; 98 nxt_buf_t *b; 99 nxt_bool_t first; 100 nxt_conn_t *c; 101 nxt_job_sendfile_t *jbs; 102 103 jbs = obj; 104 c = data; 105 106 nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd); 107 108 first = c->socket.write_ready; 109 b = jbs->out; 110 111 do { 112 ret = c->io->sendbuf(c, b, jbs->limit); 113 114 if (ret == NXT_AGAIN) { 115 break; 116 } 117 118 if (nxt_slow_path(ret == NXT_ERROR)) { 119 goto done; 120 } 121 122 jbs->sent += ret; 123 jbs->limit -= ret; 124 125 b = nxt_sendbuf_update(b, ret); 126 127 if (b == NULL) { 128 goto done; 129 } 130 131 if (jbs->limit == 0) { 132 133 if (c->rate == NULL) { 134 jbs->limit = c->max_chunk; 135 goto fast; 136 } 137 138 goto done; 139 } 140 141 } while (c->socket.write_ready); 142 143 if (first && task->thread->thread_pool->work_queue.head != NULL) { 144 goto fast; 145 } 146 147 done: 148 149 nxt_job_return(task, &jbs->job, jbs->ready_handler); 150 return; 151 152 fast: 153 154 nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler, 155 jbs->job.task, jbs, c); 156 157 nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work); 158 } 159 160 161 static void 162 nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) 163 { 164 size_t sent; 165 nxt_buf_t *b; 166 nxt_bool_t done; 167 nxt_conn_t *c; 168 nxt_job_sendfile_t *jbs; 169 170 jbs = obj; 171 c = data; 172 173 c->blocked = 0; 174 175 sent = jbs->sent; 176 c->sent += sent; 177 178 nxt_debug(task, "event conn sendfile sent:%z", sent); 179 180 b = jbs->out; 181 182 /* The job must be destroyed before connection error handler. */ 183 nxt_job_destroy(task, jbs); 184 185 if (0 /* STUB: c->write_state->process_buffers */) { 186 b = nxt_event_conn_job_sendfile_completion(task, c, b); 187 188 done = (b == NULL); 189 190 /* Add data which might be added after sendfile job has started. */ 191 nxt_buf_chain_add(&b, c->write); 192 c->write = b; 193 194 if (done) { 195 /* All data has been sent. */ 196 197 if (b != NULL) { 198 /* But new data has been added. */ 199 nxt_event_conn_job_sendfile_start(task, c, NULL); 200 } 201 202 return; 203 } 204 } 205 206 if (sent != 0 && c->write_state->timer_autoreset) { 207 nxt_timer_disable(task->thread->engine, &c->write_timer); 208 } 209 210 if (c->socket.error == 0 211 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent)) 212 { 213 nxt_conn_timer(task->thread->engine, c, c->write_state, 214 &c->write_timer); 215 216 nxt_fd_event_oneshot_write(task->thread->engine, &c->socket); 217 } 218 219 if (sent != 0) { 220 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 221 task, c, c->socket.data); 222 /* 223 * Fall through if first operations were 224 * successful but the last one failed. 225 */ 226 } 227 228 if (nxt_slow_path(c->socket.error != 0)) { 229 nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler, 230 task, c, c->socket.data); 231 } 232 } 233 234 235 static nxt_buf_t * 236 nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_conn_t *c, 237 nxt_buf_t *b) 238 { 239 while (b != NULL) { 240 241 nxt_prefetch(b->next); 242 243 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) { 244 break; 245 246 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) { 247 break; 248 } 249 250 nxt_work_queue_add(c->write_work_queue, 251 b->completion_handler, task, b, b->parent); 252 253 b = b->next; 254 } 255 256 return b; 257 } 258