DPDK17.02 ring 管理1 – rte_ring_create 与 rte_ring_free
该函数用来创建一个ring。
1、ring结构
struct rte_ring 结构的定义如下;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | struct rte_ring { char name[RTE_MEMZONE_NAMESIZE]; /**< ring 的名字 */ int flags; /**< 创建 ring 时提供的 flags */ const struct rte_memzone *memzone; /**< 关联该 ring 的 memzone */ struct prod { /** Ring 生产者的状态 */ uint32_t watermark; /**< Maximum items before EDQUOT. */ uint32_t sp_enqueue; /**< 如果为单生产者,则为真 */ uint32_t size; /**< ring 的大小,即包含对象的数量 */ uint32_t mask; /**< Mask (size-1) of ring. */ volatile uint32_t head; /**< 生产者的头部 */ volatile uint32_t tail; /**< 生产者的尾部 */ } prod __rte_cache_aligned; struct cons { /** Ring 消费者的状态 */ uint32_t sc_dequeue; /**< 如果为单消费者,则为真 */ uint32_t size; /**< ring 的大小,即包含对象的数量 */ uint32_t mask; /**< Mask (size-1) of ring. */ volatile uint32_t head; /**< 消费者的头部 */ volatile uint32_t tail; /**< 消费者的尾部 */ #ifdef RTE_RING_SPLIT_PROD_CONS } cons __rte_cache_aligned; #else } cons; #endif #ifdef RTE_LIBRTE_RING_DEBUG struct rte_ring_debug_stats stats[RTE_MAX_LCORE]; #endif void *ring[] __rte_cache_aligned; /**< ring 开始的内存空间,这里没有使用 volatile,因此如果存在编译器重新排序的情况,需要小心处理。 */ }; |
2、rte_ring_create() 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | struct rte_ring *rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags) { char mz_name[RTE_MEMZONE_NAMESIZE]; struct rte_ring *r; struct rte_tailq_entry *te; const struct rte_memzone *mz; ssize_t ring_size; int mz_flags = 0; struct rte_ring_list* ring_list = NULL; int ret; ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); /* 根据ring中元素的数量,计算ring占用的内存空间大小. * 即一个 struct rte_ring 的大小,加上 count 个 sizeof(void *) 的大小*/ ring_size = rte_ring_get_memsize(count); if (ring_size < 0) { rte_errno = ring_size; return NULL; } /* ring 的名字,格式为 RG_<name> */ ret = snprintf(mz_name, sizeof(mz_name), "%s%s", RTE_RING_MZ_PREFIX, name); if (ret < 0 || ret >= (int)sizeof(mz_name)) { rte_errno = ENAMETOOLONG; return NULL; } /* 创建一个 struct rte_tailq_entry 结构,用来与关联 ring 与 ring_list */ te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0); if (te == NULL) { RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n"); rte_errno = ENOMEM; return NULL; } rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); /* 为 ring 分配内存 */ mz = rte_memzone_reserve(mz_name, ring_size, socket_id, mz_flags); if (mz != NULL) { r = mz->addr; /* 这里不需要检查返回值,因为前面已经检查过参数了 */ rte_ring_init(r, name, count, flags); te->data = (void *) r; r->memzone = mz; TAILQ_INSERT_TAIL(ring_list, te, next); } else { r = NULL; RTE_LOG(ERR, RING, "Cannot reserve memory\n"); rte_free(te); } rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); return r; } |
3、rte_ring_init() 函数
该函数用来初始化一个分配好的 ring。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, unsigned flags) { int ret; /* 编译时检查,struct rte_ring 结构的大小,以及其中的部分成员的地址,必须是 cacheline 对齐的 */ RTE_BUILD_BUG_ON((sizeof(struct rte_ring) & RTE_CACHE_LINE_MASK) != 0); #ifdef RTE_RING_SPLIT_PROD_CONS RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) & RTE_CACHE_LINE_MASK) != 0); #endif RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & RTE_CACHE_LINE_MASK) != 0); #ifdef RTE_LIBRTE_RING_DEBUG RTE_BUILD_BUG_ON((sizeof(struct rte_ring_debug_stats) & RTE_CACHE_LINE_MASK) != 0); RTE_BUILD_BUG_ON((offsetof(struct rte_ring, stats) & RTE_CACHE_LINE_MASK) != 0); #endif /* 初始化 ring 结构体 */ memset(r, 0, sizeof(*r)); ret = snprintf(r->name, sizeof(r->name), "%s", name); /* 设置 ring 的名字 */ if (ret < 0 || ret >= (int)sizeof(r->name)) return -ENAMETOOLONG; r->flags = flags; r->prod.watermark = count; r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ); /* 确认是否是单生产者或多生产者 */ r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ); /* 确认是否是单消费者或多消费者 */ r->prod.size = r->cons.size = count; /* ring 中元素的数量 */ r->prod.mask = r->cons.mask = count-1; r->prod.head = r->cons.head = 0; /* 生产者与消费者的头部指向 */ r->prod.tail = r->cons.tail = 0; /* 生产者与消费者的尾部指向 */ return 0; } |
4、rte_ring_free() 函数
该函数用来释放一个 ring 结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | void rte_ring_free(struct rte_ring *r) { struct rte_ring_list *ring_list = NULL; struct rte_tailq_entry *te; if (r == NULL) return; /* ring 不是通过 rte_ring_create() 函数创建的,因此没有对应的memzone */ if (r->memzone == NULL) { RTE_LOG(ERR, RING, "Cannot free ring (not created with rte_ring_create()"); return; } /* 释放memzone 及其指向的数据,即 ring 结构内存空间 */ if (rte_memzone_free(r->memzone) != 0) { RTE_LOG(ERR, RING, "Cannot free memory\n"); return; } ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); /* 从 ring_list 队列中找到 */ TAILQ_FOREACH(te, ring_list, next) { if (te->data == (void *) r) break; } if (te == NULL) { rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); return; } TAILQ_REMOVE(ring_list, te, next); /* 从队列中移出 */ rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); rte_free(te); } |
————————————————————
原创文章,转载请注明: 转载自孙希栋的博客
本文链接地址: 《DPDK17.02 ring 管理1 – rte_ring_create 与 rte_ring_free》