1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 #if (NXT_THREADS) 11 12 typedef struct { 13 nxt_job_t job; 14 nxt_buf_t *out; 15 size_t sent; 16 size_t limit; 17 nxt_work_handler_t ready_handler; 18 } nxt_job_sendfile_t; 19 20 21 static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, 22 void *data); 23 static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, 24 void *data); 25 static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, 26 void *data); 27 static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task, 28 nxt_event_conn_t *c, nxt_buf_t *b); 29 30 31 void 32 nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c) 33 { 34 nxt_fd_event_disable(task->thread->engine, &c->socket); 35 36 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */ 37 nxt_event_conn_job_sendfile_start(task, c, NULL); 38 } 39 40 41 static void 42 nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data) 43 { 44 nxt_iobuf_t b; 45 nxt_event_conn_t *c; 46 nxt_job_sendfile_t *jbs; 47 nxt_sendbuf_coalesce_t sb; 48 49 c = obj; 50 51 nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd); 52 53 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t)); 54 55 if (nxt_slow_path(jbs == NULL)) { 56 c->write_state->error_handler(task, c, NULL); 57 return; 58 } 59 60 c->socket.write_handler = nxt_event_conn_job_sendfile_start; 61 c->socket.error_handler = c->write_state->error_handler; 62 63 jbs->job.data = c; 64 nxt_job_set_name(&jbs->job, "job sendfile"); 65 66 jbs->limit = nxt_event_conn_write_limit(c); 67 68 if (jbs->limit != 0) { 69 70 sb.buf = c->write; 71 sb.iobuf = &b; 72 sb.nmax = 1; 73 sb.sync = 0; 74 sb.size = 0; 75 sb.limit = jbs->limit; 76 77 if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) { 78 79 jbs->job.thread_pool = c->u.thread_pool; 80 jbs->job.log = c->socket.log; 81 jbs->out = c->write; 82 c->write = NULL; 83 jbs->ready_handler = nxt_event_conn_job_sendfile_return; 84 85 c->blocked = 1; 86 87 nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler); 88 return; 89 } 90 } 91 92 nxt_event_conn_job_sendfile_return(task, jbs, c); 93 } 94 95 96 static void 97 nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data) 98 { 99 ssize_t ret; 100 nxt_buf_t *b; 101 nxt_bool_t first; 102 nxt_event_conn_t *c; 103 nxt_job_sendfile_t *jbs; 104 105 jbs = obj; 106 c = data; 107 108 nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd); 109 110 first = c->socket.write_ready; 111 b = jbs->out; 112 113 do { 114 ret = c->io->sendbuf(c, b, jbs->limit); 115 116 if (ret == NXT_AGAIN) { 117 break; 118 } 119 120 if (nxt_slow_path(ret == NXT_ERROR)) { 121 goto done; 122 } 123 124 jbs->sent += ret; 125 jbs->limit -= ret; 126 127 b = nxt_sendbuf_update(b, ret); 128 129 if (b == NULL) { 130 goto done; 131 } 132 133 if (jbs->limit == 0) { 134 135 if (c->rate == NULL) { 136 jbs->limit = c->max_chunk; 137 goto fast; 138 } 139 140 goto done; 141 } 142 143 } while (c->socket.write_ready); 144 145 if (first && task->thread->thread_pool->work_queue.head != NULL) { 146 goto fast; 147 } 148 149 done: 150 151 nxt_job_return(task, &jbs->job, jbs->ready_handler); 152 return; 153 154 fast: 155 156 nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler, 157 jbs->job.task, jbs, c); 158 159 nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work); 160 } 161 162 163 static void 164 nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) 165 { 166 size_t sent; 167 nxt_buf_t *b; 168 nxt_bool_t done; 169 nxt_event_conn_t *c; 170 nxt_job_sendfile_t *jbs; 171 172 jbs = obj; 173 c = data; 174 175 c->blocked = 0; 176 177 sent = jbs->sent; 178 c->sent += sent; 179 180 nxt_debug(task, "event conn sendfile sent:%z", sent); 181 182 b = jbs->out; 183 184 /* The job must be destroyed before connection error handler. */ 185 nxt_job_destroy(jbs); 186 187 if (c->write_state->process_buffers) { 188 b = nxt_event_conn_job_sendfile_completion(task, c, b); 189 190 done = (b == NULL); 191 192 /* Add data which might be added after sendfile job has started. */ 193 nxt_buf_chain_add(&b, c->write); 194 c->write = b; 195 196 if (done) { 197 /* All data has been sent. */ 198 199 if (b != NULL) { 200 /* But new data has been added. */ 201 nxt_event_conn_job_sendfile_start(task, c, NULL); 202 } 203 204 return; 205 } 206 } 207 208 if (sent != 0 && c->write_state->autoreset_timer) { 209 nxt_timer_disable(task->thread->engine, &c->write_timer); 210 } 211 212 if (c->socket.error == 0 213 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent)) 214 { 215 nxt_event_conn_timer(task->thread->engine, c, c->write_state, 216 &c->write_timer); 217 218 nxt_fd_event_oneshot_write(task->thread->engine, &c->socket); 219 } 220 221 if (sent != 0) { 222 nxt_event_conn_io_handle(task->thread, c->write_work_queue, 223 c->write_state->ready_handler, 224 task, c, c->socket.data); 225 /* 226 * Fall through if first operations were 227 * successful but the last one failed. 228 */ 229 } 230 231 if (nxt_slow_path(c->socket.error != 0)) { 232 nxt_event_conn_io_handle(task->thread, c->write_work_queue, 233 c->write_state->error_handler, 234 task, c, c->socket.data); 235 } 236 } 237 238 239 static nxt_buf_t * 240 nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c, 241 nxt_buf_t *b) 242 { 243 while (b != NULL) { 244 245 nxt_prefetch(b->next); 246 247 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) { 248 break; 249 250 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) { 251 break; 252 } 253 254 nxt_work_queue_add(c->write_work_queue, 255 b->completion_handler, task, b, b->parent); 256 257 b = b->next; 258 } 259 260 return b; 261 } 262 263 #endif 264