1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10#if (NXT_THREADS) 11 12typedef struct { 13 nxt_job_t job; 14 nxt_buf_t *out; 15 size_t sent; 16 size_t limit; 17 nxt_work_handler_t ready_handler; 18} nxt_job_sendfile_t; 19 20 21static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, 22 void *data); 23static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, 24 void *data); 25static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, 26 void *data); 27static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10#if (NXT_THREADS) 11 12typedef struct { 13 nxt_job_t job; 14 nxt_buf_t *out; 15 size_t sent; 16 size_t limit; 17 nxt_work_handler_t ready_handler; 18} nxt_job_sendfile_t; 19 20 21static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, 22 void *data); 23static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, 24 void *data); 25static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, 26 void *data); 27static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
|
28 nxt_event_conn_t *c, nxt_buf_t *b);
| 28 nxt_conn_t *c, nxt_buf_t *b);
|
29 30 31void
| 29 30 31void
|
32nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c)
| 32nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_conn_t *c)
|
33{ 34 nxt_fd_event_disable(task->thread->engine, &c->socket); 35 36 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */ 37 nxt_event_conn_job_sendfile_start(task, c, NULL); 38} 39 40 41static void 42nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data) 43{
| 33{ 34 nxt_fd_event_disable(task->thread->engine, &c->socket); 35 36 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */ 37 nxt_event_conn_job_sendfile_start(task, c, NULL); 38} 39 40 41static void 42nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data) 43{
|
| 44 nxt_conn_t *c;
|
44 nxt_iobuf_t b;
| 45 nxt_iobuf_t b;
|
45 nxt_event_conn_t *c;
| |
46 nxt_job_sendfile_t *jbs; 47 nxt_sendbuf_coalesce_t sb; 48 49 c = obj; 50 51 nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd); 52 53 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t)); 54 55 if (nxt_slow_path(jbs == NULL)) { 56 c->write_state->error_handler(task, c, NULL); 57 return; 58 } 59 60 c->socket.write_handler = nxt_event_conn_job_sendfile_start; 61 c->socket.error_handler = c->write_state->error_handler; 62 63 jbs->job.data = c; 64 nxt_job_set_name(&jbs->job, "job sendfile"); 65 66 jbs->limit = nxt_event_conn_write_limit(c); 67 68 if (jbs->limit != 0) { 69 70 sb.buf = c->write; 71 sb.iobuf = &b; 72 sb.nmax = 1; 73 sb.sync = 0; 74 sb.size = 0; 75 sb.limit = jbs->limit; 76 77 if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) { 78 79 jbs->job.thread_pool = c->u.thread_pool; 80 jbs->job.log = c->socket.log; 81 jbs->out = c->write; 82 c->write = NULL; 83 jbs->ready_handler = nxt_event_conn_job_sendfile_return; 84 85 c->blocked = 1; 86 87 nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler); 88 return; 89 } 90 } 91 92 nxt_event_conn_job_sendfile_return(task, jbs, c); 93} 94 95 96static void 97nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data) 98{ 99 ssize_t ret; 100 nxt_buf_t *b; 101 nxt_bool_t first;
| 46 nxt_job_sendfile_t *jbs; 47 nxt_sendbuf_coalesce_t sb; 48 49 c = obj; 50 51 nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd); 52 53 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t)); 54 55 if (nxt_slow_path(jbs == NULL)) { 56 c->write_state->error_handler(task, c, NULL); 57 return; 58 } 59 60 c->socket.write_handler = nxt_event_conn_job_sendfile_start; 61 c->socket.error_handler = c->write_state->error_handler; 62 63 jbs->job.data = c; 64 nxt_job_set_name(&jbs->job, "job sendfile"); 65 66 jbs->limit = nxt_event_conn_write_limit(c); 67 68 if (jbs->limit != 0) { 69 70 sb.buf = c->write; 71 sb.iobuf = &b; 72 sb.nmax = 1; 73 sb.sync = 0; 74 sb.size = 0; 75 sb.limit = jbs->limit; 76 77 if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) { 78 79 jbs->job.thread_pool = c->u.thread_pool; 80 jbs->job.log = c->socket.log; 81 jbs->out = c->write; 82 c->write = NULL; 83 jbs->ready_handler = nxt_event_conn_job_sendfile_return; 84 85 c->blocked = 1; 86 87 nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler); 88 return; 89 } 90 } 91 92 nxt_event_conn_job_sendfile_return(task, jbs, c); 93} 94 95 96static void 97nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data) 98{ 99 ssize_t ret; 100 nxt_buf_t *b; 101 nxt_bool_t first;
|
102 nxt_event_conn_t *c;
| 102 nxt_conn_t *c;
|
103 nxt_job_sendfile_t *jbs; 104 105 jbs = obj; 106 c = data; 107 108 nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd); 109 110 first = c->socket.write_ready; 111 b = jbs->out; 112 113 do { 114 ret = c->io->sendbuf(c, b, jbs->limit); 115 116 if (ret == NXT_AGAIN) { 117 break; 118 } 119 120 if (nxt_slow_path(ret == NXT_ERROR)) { 121 goto done; 122 } 123 124 jbs->sent += ret; 125 jbs->limit -= ret; 126 127 b = nxt_sendbuf_update(b, ret); 128 129 if (b == NULL) { 130 goto done; 131 } 132 133 if (jbs->limit == 0) { 134 135 if (c->rate == NULL) { 136 jbs->limit = c->max_chunk; 137 goto fast; 138 } 139 140 goto done; 141 } 142 143 } while (c->socket.write_ready); 144 145 if (first && task->thread->thread_pool->work_queue.head != NULL) { 146 goto fast; 147 } 148 149done: 150 151 nxt_job_return(task, &jbs->job, jbs->ready_handler); 152 return; 153 154fast: 155 156 nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler, 157 jbs->job.task, jbs, c); 158 159 nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work); 160} 161 162 163static void 164nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) 165{ 166 size_t sent; 167 nxt_buf_t *b; 168 nxt_bool_t done;
| 103 nxt_job_sendfile_t *jbs; 104 105 jbs = obj; 106 c = data; 107 108 nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd); 109 110 first = c->socket.write_ready; 111 b = jbs->out; 112 113 do { 114 ret = c->io->sendbuf(c, b, jbs->limit); 115 116 if (ret == NXT_AGAIN) { 117 break; 118 } 119 120 if (nxt_slow_path(ret == NXT_ERROR)) { 121 goto done; 122 } 123 124 jbs->sent += ret; 125 jbs->limit -= ret; 126 127 b = nxt_sendbuf_update(b, ret); 128 129 if (b == NULL) { 130 goto done; 131 } 132 133 if (jbs->limit == 0) { 134 135 if (c->rate == NULL) { 136 jbs->limit = c->max_chunk; 137 goto fast; 138 } 139 140 goto done; 141 } 142 143 } while (c->socket.write_ready); 144 145 if (first && task->thread->thread_pool->work_queue.head != NULL) { 146 goto fast; 147 } 148 149done: 150 151 nxt_job_return(task, &jbs->job, jbs->ready_handler); 152 return; 153 154fast: 155 156 nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler, 157 jbs->job.task, jbs, c); 158 159 nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work); 160} 161 162 163static void 164nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) 165{ 166 size_t sent; 167 nxt_buf_t *b; 168 nxt_bool_t done;
|
169 nxt_event_conn_t *c;
| 169 nxt_conn_t *c;
|
170 nxt_job_sendfile_t *jbs; 171 172 jbs = obj; 173 c = data; 174 175 c->blocked = 0; 176 177 sent = jbs->sent; 178 c->sent += sent; 179 180 nxt_debug(task, "event conn sendfile sent:%z", sent); 181 182 b = jbs->out; 183 184 /* The job must be destroyed before connection error handler. */ 185 nxt_job_destroy(task, jbs); 186 187 if (0 /* STUB: c->write_state->process_buffers */) { 188 b = nxt_event_conn_job_sendfile_completion(task, c, b); 189 190 done = (b == NULL); 191 192 /* Add data which might be added after sendfile job has started. */ 193 nxt_buf_chain_add(&b, c->write); 194 c->write = b; 195 196 if (done) { 197 /* All data has been sent. */ 198 199 if (b != NULL) { 200 /* But new data has been added. */ 201 nxt_event_conn_job_sendfile_start(task, c, NULL); 202 } 203 204 return; 205 } 206 } 207 208 if (sent != 0 && c->write_state->timer_autoreset) { 209 nxt_timer_disable(task->thread->engine, &c->write_timer); 210 } 211 212 if (c->socket.error == 0 213 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent)) 214 {
| 170 nxt_job_sendfile_t *jbs; 171 172 jbs = obj; 173 c = data; 174 175 c->blocked = 0; 176 177 sent = jbs->sent; 178 c->sent += sent; 179 180 nxt_debug(task, "event conn sendfile sent:%z", sent); 181 182 b = jbs->out; 183 184 /* The job must be destroyed before connection error handler. */ 185 nxt_job_destroy(task, jbs); 186 187 if (0 /* STUB: c->write_state->process_buffers */) { 188 b = nxt_event_conn_job_sendfile_completion(task, c, b); 189 190 done = (b == NULL); 191 192 /* Add data which might be added after sendfile job has started. */ 193 nxt_buf_chain_add(&b, c->write); 194 c->write = b; 195 196 if (done) { 197 /* All data has been sent. */ 198 199 if (b != NULL) { 200 /* But new data has been added. */ 201 nxt_event_conn_job_sendfile_start(task, c, NULL); 202 } 203 204 return; 205 } 206 } 207 208 if (sent != 0 && c->write_state->timer_autoreset) { 209 nxt_timer_disable(task->thread->engine, &c->write_timer); 210 } 211 212 if (c->socket.error == 0 213 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent)) 214 {
|
215 nxt_event_conn_timer(task->thread->engine, c, c->write_state, 216 &c->write_timer);
| 215 nxt_conn_timer(task->thread->engine, c, c->write_state, 216 &c->write_timer);
|
217 218 nxt_fd_event_oneshot_write(task->thread->engine, &c->socket); 219 } 220 221 if (sent != 0) { 222 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 223 task, c, c->socket.data); 224 /* 225 * Fall through if first operations were 226 * successful but the last one failed. 227 */ 228 } 229 230 if (nxt_slow_path(c->socket.error != 0)) { 231 nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler, 232 task, c, c->socket.data); 233 } 234} 235 236 237static nxt_buf_t *
| 217 218 nxt_fd_event_oneshot_write(task->thread->engine, &c->socket); 219 } 220 221 if (sent != 0) { 222 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 223 task, c, c->socket.data); 224 /* 225 * Fall through if first operations were 226 * successful but the last one failed. 227 */ 228 } 229 230 if (nxt_slow_path(c->socket.error != 0)) { 231 nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler, 232 task, c, c->socket.data); 233 } 234} 235 236 237static nxt_buf_t *
|
238nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c,
| 238nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_conn_t *c,
|
239 nxt_buf_t *b) 240{ 241 while (b != NULL) { 242 243 nxt_prefetch(b->next); 244 245 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) { 246 break; 247 248 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) { 249 break; 250 } 251 252 nxt_work_queue_add(c->write_work_queue, 253 b->completion_handler, task, b, b->parent); 254 255 b = b->next; 256 } 257 258 return b; 259} 260 261#endif
| 239 nxt_buf_t *b) 240{ 241 while (b != NULL) { 242 243 nxt_prefetch(b->next); 244 245 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) { 246 break; 247 248 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) { 249 break; 250 } 251 252 nxt_work_queue_add(c->write_work_queue, 253 b->completion_handler, task, b, b->parent); 254 255 b = b->next; 256 } 257 258 return b; 259} 260 261#endif
|