DPDK17.02 ring 管理3 – common_ring_mp_enqueue
该函数是针对多生产者的入队函数,其调用关系如下:
1 2 3 | common_ring_mp_enqueue(struct rte_mempool *mp, void * const *obj_table, unsigned n) --> rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned n) --> __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED) |
RTE_RING_QUEUE_FIXED 宏表示只能入队固定数量的对象,这些对象要么全部入队,函数执行成功;要么全部入队失败,函数执行失败。
最新版本的DPDK 里面已经对这些出队入队函数做了简化处理,可参考《DPDK20.05 – rte_ring无锁环形队列的管理》,这里还是以DPDK 17.02 版本为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | static inline int __attribute__((always_inline)) __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, unsigned n, enum rte_ring_queue_behavior behavior) { uint32_t prod_head, prod_next; uint32_t cons_tail, free_entries; const unsigned max = n; int success; unsigned i, rep = 0; uint32_t mask = r->prod.mask; int ret; /* 避免非必要的cmpset 操作,即使 n 的值为0,下面的操作也会带来性能损耗 */ if (n == 0) return 0; /* 自动移动r->prod.head */ do { /* Reset n to the initial burst count */ n = max; prod_head = r->prod.head; cons_tail = r->cons.tail; /* 减法是在两个无符号32位整数间进行的 *(结果总是会对32位取模,即使 prod_head > cons_tail)。 * 因此 free_entries 的值总是在 0 与 size(ring)-1 之间 */ free_entries = (mask + cons_tail - prod_head); /* 检查 ring 中是否有足够的空间 */ if (unlikely(n > free_entries)) { if (behavior == RTE_RING_QUEUE_FIXED) { /* 只能入队固定数量的对象 */ __RING_STAT_ADD(r, enq_fail, n); return -ENOBUFS; } else { /* 可以入队任意数量的对象 */ /* 完全没有空闲空间 */ if (unlikely(free_entries == 0)) { __RING_STAT_ADD(r, enq_fail, n); return 0; } n = free_entries; } } /* 移动 r->prod.head */ prod_next = prod_head + n; success = rte_atomic32_cmpset(&r->prod.head, prod_head, prod_next); } while (unlikely(success == 0)); /* 实际执行入队操作。可参考《rte_ring无锁环形队列的管理》 */ ENQUEUE_PTRS(); rte_smp_wmb(); /* 如果超出 watermark 的值 */ if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT : (int)(n | RTE_RING_QUOT_EXCEED); __RING_STAT_ADD(r, enq_quota, n); } else { ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; __RING_STAT_ADD(r, enq_success, n); } /* 如果还有其他正在进行的入队操作,则等待 */ while (unlikely(r->prod.tail != prod_head)) { rte_pause(); /* 设置 RTE_RING_PAUSE_REP_COUNT 以避免为了等待其它线程结束而等待太长时间。 * 它使抢占线程有机会继续完成 ring 的入队操作。 */ if (RTE_RING_PAUSE_REP_COUNT && ++rep == RTE_RING_PAUSE_REP_COUNT) { rep = 0; sched_yield(); /* 让出CPU */ } } r->prod.tail = prod_next; return ret; } |
————————————————————
原创文章,转载请注明: 转载自孙希栋的博客