xref: /unit/src/test/nxt_cq_test.c (revision 1554:8f22edff911d)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <nxt_main.h>
7 #include <math.h>
8 #include <inttypes.h>
9 
10 #ifndef NXT_NCQ_TEST
11 #define NXT_NCQ_TEST          1
12 #endif
13 
14 #define NXT_QTEST_USE_THREAD  0
15 
16 #if NXT_NCQ_TEST
17 #include <nxt_nncq.h>
18 #else
19 #include <nxt_nvbcq.h>
20 #endif
21 
22 
23 #define MAX_ITER   20
24 #define STAT_ITER  5
25 #define MIN_COV    0.02
26 
27 extern char  **environ;
28 static uintptr_t nops = 10000000;
29 
30 static uintptr_t nprocs_enq     = 0;
31 static uintptr_t nprocs_deq     = 0;
32 static uintptr_t nprocs_wenq    = 0;
33 static uintptr_t nprocs_wdeq    = 0;
34 static uintptr_t nprocs_enq_deq = 0;
35 static uintptr_t nprocs_cas     = 0;
36 static uintptr_t nprocs_faa     = 0;
37 
38 static uintptr_t nprocs = 1;
39 
40 
41 static size_t
elapsed_time(size_t us)42 elapsed_time(size_t us)
43 {
44   struct timeval t;
45 
46   gettimeofday(&t, NULL);
47 
48   return t.tv_sec * 1000000 + t.tv_usec - us;
49 }
50 
51 
52 static double
mean(const double * times,int n)53 mean(const double *times, int n)
54 {
55     int     i;
56     double  sum;
57 
58     sum = 0;
59 
60     for (i = 0; i < n; i++) {
61         sum += times[i];
62     }
63 
64     return sum / n;
65 }
66 
67 
68 static double
cov(const double * times,double mean,int n)69 cov(const double *times, double mean, int n)
70 {
71     int     i;
72     double  variance;
73 
74     variance = 0;
75 
76     for (i = 0; i < n; i++) {
77         variance += (times[i] - mean) * (times[i] - mean);
78     }
79 
80     variance /= n;
81 
82     return sqrt(variance) / mean;
83 }
84 
85 typedef struct {
86 #if NXT_NCQ_TEST
87     nxt_nncq_t   free_queue;
88     nxt_nncq_t   active_queue;
89 #else
90     nxt_nvbcq_t  free_queue;
91     nxt_nvbcq_t  active_queue;
92 #endif
93     uint32_t     counter;
94 } nxt_cq_t;
95 
96 
97 static nxt_cq_t  *pgq;
98 
99 
100 #if NXT_NCQ_TEST
101 #define nxt_cq_enqueue  nxt_nncq_enqueue
102 #define nxt_cq_dequeue  nxt_nncq_dequeue
103 #define nxt_cq_empty    nxt_nncq_empty
104 #define nxt_cq_init     nxt_nncq_init
105 #define NXT_CQ_SIZE     NXT_NNCQ_SIZE
106 #else
107 #define nxt_cq_enqueue  nxt_nvbcq_enqueue
108 #define nxt_cq_dequeue  nxt_nvbcq_dequeue
109 #define nxt_cq_empty    nxt_nvbcq_empty
110 #define nxt_cq_init     nxt_nvbcq_init
111 #define NXT_CQ_SIZE     NXT_NVBCQ_SIZE
112 #endif
113 
114 typedef struct {
115     int                  id;
116     uint64_t             enq;
117     uint64_t             deq;
118     uint64_t             wait_enq;
119     uint64_t             wait_deq;
120     uint64_t             own_res;
121     uint64_t             cas;
122     uint64_t             faa;
123 
124 #if NXT_QTEST_USE_THREAD
125     nxt_thread_handle_t  handle;
126 #else
127     nxt_pid_t            pid;
128     int status;
129 #endif
130 } nxt_worker_info_t;
131 
132 
133 static void
cas_worker(void * p)134 cas_worker(void *p)
135 {
136     nxt_cq_t           *q;
137     uint32_t           c;
138     uintptr_t          i;
139     nxt_worker_info_t  *wi;
140 
141     q = pgq;
142     wi = p;
143 
144     for (i = 0; i < nops / nprocs_cas; i++) {
145         c = q->counter;
146 
147         if (nxt_atomic_cmp_set(&q->counter, c, c + 1)) {
148             ++wi->cas;
149         }
150     }
151 }
152 
153 
154 static void
faa_worker(void * p)155 faa_worker(void *p)
156 {
157     nxt_cq_t           *q;
158     uintptr_t          i;
159     nxt_worker_info_t  *wi;
160 
161     q = pgq;
162     wi = p;
163 
164     for (i = 0; i < nops / nprocs_faa; i++) {
165         nxt_atomic_fetch_add(&q->counter, 1);
166         wi->faa++;
167     }
168 }
169 
170 
171 static void
enq_deq_worker(void * p)172 enq_deq_worker(void *p)
173 {
174     nxt_cq_t           *q;
175     uintptr_t          i, v;
176     nxt_worker_info_t  *wi;
177 
178     q = pgq;
179     wi = p;
180 
181     for (i = 0; i < nops / nprocs_enq_deq; i++) {
182         v = nxt_cq_dequeue(&q->free_queue);
183 
184         if (v != nxt_cq_empty(&q->free_queue)) {
185             nxt_cq_enqueue(&q->active_queue, wi->id);
186             wi->enq++;
187         }
188 
189         v = nxt_cq_dequeue(&q->active_queue);
190 
191         if (v != nxt_cq_empty(&q->active_queue)) {
192             nxt_cq_enqueue(&q->free_queue, v);
193             wi->deq++;
194 
195             if ((int) v == wi->id) {
196                 wi->own_res++;
197             }
198         }
199     }
200 }
201 
202 
203 static void
enq_worker(void * p)204 enq_worker(void *p)
205 {
206     nxt_cq_t           *q;
207     uintptr_t          i, v;
208     nxt_worker_info_t  *wi;
209 
210     q = pgq;
211     wi = p;
212 
213     for (i = 0; i < nops / nprocs_enq; i++) {
214         v = nxt_cq_dequeue(&q->free_queue);
215 
216         if (v != nxt_cq_empty(&q->free_queue)) {
217             nxt_cq_enqueue(&q->active_queue, v);
218             wi->enq++;
219         }
220     }
221 }
222 
223 
224 static void
deq_worker(void * p)225 deq_worker(void *p)
226 {
227     nxt_cq_t           *q;
228     uintptr_t          i, v;
229     nxt_worker_info_t  *wi;
230 
231     q = pgq;
232     wi = p;
233 
234     for (i = 0; i < nops / nprocs_deq; i++) {
235         v = nxt_cq_dequeue(&q->active_queue);
236 
237         if (v != nxt_cq_empty(&q->active_queue)) {
238             nxt_cq_enqueue(&q->free_queue, v);
239             ++wi->deq;
240         }
241     }
242 }
243 
244 
245 static void
wenq_worker(void * p)246 wenq_worker(void *p)
247 {
248     nxt_cq_t           *q;
249     uintptr_t          i, v;
250     nxt_worker_info_t  *wi;
251 
252     q = pgq;
253     wi = p;
254 
255     for (i = 0; i < nops / nprocs_wenq; i++) {
256 
257         do {
258             wi->wait_enq++;
259             v = nxt_cq_dequeue(&q->free_queue);
260         } while (v == nxt_cq_empty(&q->free_queue));
261 
262         nxt_cq_enqueue(&q->active_queue, v);
263 
264         wi->enq++;
265         wi->wait_enq--;
266     }
267 }
268 
269 
270 static void
wdeq_worker(void * p)271 wdeq_worker(void *p)
272 {
273     nxt_cq_t           *q;
274     uintptr_t          i, v;
275     nxt_worker_info_t  *wi;
276 
277     q = pgq;
278     wi = p;
279 
280     for (i = 0; i < nops / nprocs_wdeq; i++) {
281 
282         do {
283             wi->wait_deq++;
284             v = nxt_cq_dequeue(&q->active_queue);
285         } while (v == nxt_cq_empty(&q->active_queue));
286 
287         nxt_cq_enqueue(&q->free_queue, v);
288 
289         wi->deq++;
290         wi->wait_deq--;
291     }
292 }
293 
294 
295 static nxt_int_t
worker_create(nxt_worker_info_t * wi,int id,nxt_thread_start_t start)296 worker_create(nxt_worker_info_t *wi, int id, nxt_thread_start_t start)
297 {
298     wi->id = id;
299 
300 #if NXT_QTEST_USE_THREAD
301     nxt_thread_link_t  *link;
302 
303     link = nxt_zalloc(sizeof(nxt_thread_link_t));
304 
305     link->start = start;
306     link->work.data = wi;
307 
308     return nxt_thread_create(&wi->handle, link);
309 
310 #else
311     pid_t pid = fork();
312 
313     if (pid == 0) {
314         start(wi);
315         exit(0);
316 
317     } else {
318         wi->pid = pid;
319     }
320 
321     return NXT_OK;
322 #endif
323 }
324 
325 
326 static void
worker_wait(nxt_worker_info_t * wi)327 worker_wait(nxt_worker_info_t *wi)
328 {
329 #if NXT_QTEST_USE_THREAD
330     pthread_join(wi->handle, NULL);
331 
332 #else
333     waitpid(wi->pid, &wi->status, 0);
334 #endif
335 }
336 
337 
338 int nxt_cdecl
main(int argc,char ** argv)339 main(int argc, char **argv)
340 {
341     int                i, k, id, verbose, objective, rk;
342     char               *a;
343     size_t             start, elapsed;
344     double             *stats, m, c;
345     uint64_t           total_ops;
346     uintptr_t          j;
347     nxt_task_t         task;
348     nxt_thread_t       *thr;
349     nxt_worker_info_t  *wi;
350     double             times[MAX_ITER], mopsec[MAX_ITER];
351 
352     verbose = 0;
353     objective = 0;
354 
355     for (i = 1; i < argc; i++) {
356         a = argv[i];
357 
358         if (strcmp(a, "-v") == 0) {
359             verbose++;
360             continue;
361         }
362 
363         if (strcmp(a, "-n") == 0 && (i + 1) < argc) {
364             nops = atoi(argv[++i]);
365             continue;
366         }
367 
368         if (strcmp(a, "--enq") == 0 && (i + 1) < argc) {
369             nprocs_enq = atoi(argv[++i]);
370             continue;
371         }
372 
373         if (strcmp(a, "--deq") == 0 && (i + 1) < argc) {
374             nprocs_deq = atoi(argv[++i]);
375             continue;
376         }
377 
378         if (strcmp(a, "--wenq") == 0 && (i + 1) < argc) {
379             nprocs_wenq = atoi(argv[++i]);
380             continue;
381         }
382 
383         if (strcmp(a, "--wdeq") == 0 && (i + 1) < argc) {
384             nprocs_wdeq = atoi(argv[++i]);
385             continue;
386         }
387 
388         if (strcmp(a, "--ed") == 0 && (i + 1) < argc) {
389             nprocs_enq_deq = atoi(argv[++i]);
390             continue;
391         }
392 
393         if (strcmp(a, "--cas") == 0 && (i + 1) < argc) {
394             nprocs_cas = atoi(argv[++i]);
395             continue;
396         }
397 
398         if (strcmp(a, "--faa") == 0 && (i + 1) < argc) {
399             nprocs_faa = atoi(argv[++i]);
400             continue;
401         }
402 
403         if (strcmp(a, "--obj") == 0 && (i + 1) < argc) {
404             objective = atoi(argv[++i]);
405             continue;
406         }
407 
408         printf("unknown option %s", a);
409 
410         return 1;
411     }
412 
413     if (nxt_lib_start("ncq_test", argv, &environ) != NXT_OK) {
414         return 1;
415     }
416 
417     nprocs = nprocs_enq + nprocs_deq + nprocs_wenq + nprocs_wdeq
418              + nprocs_enq_deq + nprocs_cas + nprocs_faa;
419 
420     if (nprocs == 0) {
421         return 0;
422     }
423 
424     nxt_main_log.level = NXT_LOG_INFO;
425     task.log  = &nxt_main_log;
426 
427     thr = nxt_thread();
428     thr->task = &task;
429 
430     pgq = mmap(NULL, sizeof(nxt_cq_t), PROT_READ | PROT_WRITE,
431                MAP_ANON | MAP_SHARED, -1, 0);
432     if (pgq == MAP_FAILED) {
433         return 2;
434     }
435 
436     nxt_cq_init(&pgq->free_queue);
437     nxt_cq_init(&pgq->active_queue);
438 
439     for(i = 0; i < NXT_CQ_SIZE; i++) {
440         nxt_cq_enqueue(&pgq->free_queue, i);
441     }
442 
443     if (verbose >= 1) {
444         printf("number of workers: %d\n", (int) nprocs);
445         printf("number of ops:     %d\n", (int) nops);
446     }
447 
448     wi = mmap(NULL, nprocs * sizeof(nxt_worker_info_t), PROT_READ | PROT_WRITE,
449               MAP_ANON | MAP_SHARED, -1, 0);
450     if (wi == MAP_FAILED) {
451         return 3;
452     }
453 
454     for (k = 0; k < MAX_ITER; k++) {
455         nxt_memzero(wi, nprocs * sizeof(nxt_worker_info_t));
456 
457         nxt_cq_init(&pgq->free_queue);
458         nxt_cq_init(&pgq->active_queue);
459 
460         for(i = 0; i < NXT_CQ_SIZE; i++) {
461             nxt_cq_enqueue(&pgq->free_queue, i);
462         }
463 
464         start = elapsed_time(0);
465 
466         id = 0;
467 
468         for (j = 0; j < nprocs_enq; j++, id++) {
469             worker_create(wi + id, id, enq_worker);
470         }
471 
472         for (j = 0; j < nprocs_deq; j++, id++) {
473             worker_create(wi + id, id, deq_worker);
474         }
475 
476         for (j = 0; j < nprocs_wenq; j++, id++) {
477             worker_create(wi + id, id, wenq_worker);
478         }
479 
480         for (j = 0; j < nprocs_wdeq; j++, id++) {
481             worker_create(wi + id, id, wdeq_worker);
482         }
483 
484         for (j = 0; j < nprocs_enq_deq; j++, id++) {
485             worker_create(wi + id, id, enq_deq_worker);
486         }
487 
488         for (j = 0; j < nprocs_cas; j++, id++) {
489             worker_create(wi + id, id, cas_worker);
490         }
491 
492         for (j = 0; j < nprocs_faa; j++, id++) {
493             worker_create(wi + id, id, faa_worker);
494         }
495 
496         for (j = 0; j < nprocs; j++) {
497             worker_wait(wi + j);
498         }
499 
500         elapsed = elapsed_time(start);
501 
502         for (j = 1; j < nprocs; j++) {
503             wi[0].enq += wi[j].enq;
504             wi[0].deq += wi[j].deq;
505             wi[0].wait_enq += wi[j].wait_enq;
506             wi[0].wait_deq += wi[j].wait_deq;
507             wi[0].own_res += wi[j].own_res;
508             wi[0].cas += wi[j].cas;
509             wi[0].faa += wi[j].faa;
510         }
511 
512         total_ops = wi[0].enq + wi[0].deq + wi[0].cas + wi[0].faa;
513 
514         if (total_ops == 0) {
515             total_ops = nops;
516         }
517 
518         times[k] = elapsed / 1000.0;
519         mopsec[k] = (double) total_ops / elapsed;
520 
521         if (verbose >= 2) {
522             printf("enq        %10"PRIu64"\n", wi[0].enq);
523             printf("deq        %10"PRIu64"\n", wi[0].deq);
524             printf("wait_enq   %10"PRIu64"\n", wi[0].wait_enq);
525             printf("wait_deq   %10"PRIu64"\n", wi[0].wait_deq);
526             printf("own_res    %10"PRIu64"\n", wi[0].own_res);
527             printf("cas        %10"PRIu64"\n", wi[0].cas);
528             printf("faa        %10"PRIu64"\n", wi[0].faa);
529             printf("total ops  %10"PRIu64"\n", total_ops);
530             printf("Mops/sec   %13.2f\n", mopsec[k]);
531 
532             printf("elapsed    %10d us\n", (int) elapsed);
533             printf("per op     %10d ns\n", (int) ((1000 * elapsed) / total_ops));
534         }
535 
536         if (k >= STAT_ITER) {
537             stats = (objective == 0) ? times : mopsec;
538 
539             m = mean(stats + k - STAT_ITER, STAT_ITER);
540             c = cov(stats + k - STAT_ITER, m, STAT_ITER);
541 
542             if (verbose >= 1) {
543                 if (objective == 0) {
544                     printf("  #%02d elapsed time: %.2f ms; Mops/sec %.2f; "
545                            "mean time %.2f ms; cov %.4f\n",
546                            (int) k + 1, times[k], mopsec[k], m, c);
547 
548                 } else {
549                     printf("  #%02d elapsed time: %.2f ms; Mops/sec %.2f; "
550                            "mean Mop/sec %.2f; cov %.4f\n",
551                            (int) k + 1, times[k], mopsec[k], m, c);
552                 }
553             }
554 
555             if (c < MIN_COV) {
556                 rk = k - STAT_ITER;
557 
558                 for (i = rk + 1; i <= k; i++) {
559                     if (fabs(stats[i] - m) < fabs(stats[rk] - m)) {
560                         rk = i;
561                     }
562                 }
563 
564                 printf("#%d %.2f ms; %.2f\n", rk, times[rk], mopsec[rk]);
565 
566                 return 0;
567             }
568 
569         } else {
570             if (verbose >= 1) {
571                 printf("  #%02d elapsed time: %.2f ms; Mops/sec %.2f\n",
572                        (int) k + 1, times[k], mopsec[k]);
573             }
574         }
575     }
576 
577     return 0;
578 }
579