DPDK20.05 hash表2 – rte_hash_add_key
该函数用来向 hash 表中添加表项。
本文以 DPDK 20.05 版本中的hash为例。
1、数据结构
1.1 union ipv4_5tuple_host
该数据结构为hash表中保存的ipv4类型的key结构,是一个128位的结构。
1 2 3 4 5 6 7 8 9 10 11 12 | union ipv4_5tuple_host { struct { uint8_t pad0; /* 填充 */ uint8_t proto; uint16_t pad1; /* 填充 */ uint32_t ip_src; uint32_t ip_dst; uint16_t port_src; uint16_t port_dst; }; xmm_t xmm; /* 128位 */ }; |
1.2 struct ipv4_5tuple
该结构通常为普通程序使用,可用在对外开放的接口中。
在添加key时,用户只需要填充该结构,之后程序将该结构转换为可以向hash表中插入的union ipv4_5tuple_host 结构。
1 2 3 4 5 6 7 | struct ipv4_5tuple { uint32_t ip_dst; uint32_t ip_src; uint16_t port_dst; uint16_t port_src; uint8_t proto; } __rte_packed; |
1.3 union ipv6_5tuple_host
与 union ipv4_5tuple_host 类似,hash表中保存的ipv6的key结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 | union ipv6_5tuple_host { struct { uint16_t pad0; /* 填充 */ uint8_t proto; uint8_t pad1; /* 填充 */ uint8_t ip_src[IPV6_ADDR_LEN]; /* 16字节 * 8 = 128位 */ uint8_t ip_dst[IPV6_ADDR_LEN]; /* 16字节 * 8 = 128位 */ uint16_t port_src; uint16_t port_dst; uint64_t reserve; /* 保留 */ }; xmm_t xmm[XMM_NUM_IN_IPV6_5TUPLE]; /* 128 位 * 3 */ }; |
1.4 struct ipv6_5tuple
与 struct ipv4_5tuple 类似。
1 2 3 4 5 6 7 | struct ipv6_5tuple { uint8_t ip_dst[IPV6_ADDR_LEN]; uint8_t ip_src[IPV6_ADDR_LEN]; uint16_t port_dst; uint16_t port_src; uint8_t proto; } __rte_packed; |
1.5 struct ipv4_l3fwd_em_route
该结构包含两个成员变量,一个用来保存 ipv4 的五元组,另一个表示出接口的索引。
1 2 3 4 | struct ipv4_l3fwd_em_route { struct ipv4_5tuple key; uint8_t if_out; }; |
1.6 struct rte_hash_key
该结构用来保存键值对。
1 2 3 4 5 6 7 8 | struct rte_hash_key { union { uintptr_t idata; void *pdata; }; /* Variable key size */ char key[0]; }; |
该结构通过和 union ipv4_5tuple_host 或 union ipv6_5tuple_host 构成 hash 表中的key entry:
2 key 的定义与转换
2.1 key 的定义
1 2 3 4 5 6 | static struct ipv4_l3fwd_em_route ipv4_l3fwd_em_route_array[] = { {{RTE_IPV4(101, 0, 0, 0), RTE_IPV4(100, 10, 0, 1), 101, 11, IPPROTO_TCP}, 0}, {{RTE_IPV4(201, 0, 0, 0), RTE_IPV4(200, 20, 0, 1), 102, 12, IPPROTO_TCP}, 1}, {{RTE_IPV4(111, 0, 0, 0), RTE_IPV4(100, 30, 0, 1), 101, 11, IPPROTO_TCP}, 2}, {{RTE_IPV4(211, 0, 0, 0), RTE_IPV4(200, 40, 0, 1), 102, 12, IPPROTO_TCP}, 3}, }; |
RTE_IPV4宏的定义如下:
1 2 3 4 | #define RTE_IPV4(a, b, c, d) ((uint32_t)(((a) & 0xff) << 24) | \ (((b) & 0xff) << 16) | \ (((c) & 0xff) << 8) | \ ((d) & 0xff)) |
将一个10进制表示的IPv4地址转换为32位无符号整数。
2.2 将 struct ipv4_5tuple 结构的key转换为 union ipv4_5tuple_host 结构
通过调用 convert_ipv4_5tuple(&entry.key, &newkey) 函数,将 struct ipv4_5tuple 结构的key转换为 union ipv4_5tuple_host 结构。entry与 newkey 两个结构分别定义如下:
1 2 | struct ipv4_l3fwd_em_route entry; union ipv4_5tuple_host newkey; |
convert_ipv4_5tuple() 函数的实现如下:
1 2 3 4 5 6 7 8 9 10 | static void convert_ipv4_5tuple(struct ipv4_5tuple *key1, union ipv4_5tuple_host *key2) { key2->ip_dst = rte_cpu_to_be_32(key1->ip_dst); key2->ip_src = rte_cpu_to_be_32(key1->ip_src); key2->port_dst = rte_cpu_to_be_16(key1->port_dst); key2->port_src = rte_cpu_to_be_16(key1->port_src); key2->proto = key1->proto; key2->pad0 = 0; key2->pad1 = 0; } |
3、向hash表中插入key
3.1 rte_hash_add_key() 函数
该函数向 hash 表中添加新的key,返回的值取了为key所在的位置定义如下:
1 2 3 4 5 | int32_t rte_hash_add_key(const struct rte_hash *h, const void *key) { RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL); /* 参数有效性判断 */ return __rte_hash_add_key_with_hash(h, key, rte_hash_hash(h, key), 0); } |
rte_hash_hash() 函数用来根据 key 的值计算hash值,得到的值在DPDK中被称为 signature。在示例l3fwd中使用了自定义的函数ipv4_hash_crc()和ipv6_hash_crc()。
3.2 __rte_hash_add_key_with_hash() 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | static inline int32_t __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t sig, void *data) { uint16_t short_sig; uint32_t prim_bucket_idx, sec_bucket_idx; struct rte_hash_bucket *prim_bkt, *sec_bkt, *cur_bkt; struct rte_hash_key *new_k, *keys = h->key_store; uint32_t ext_bkt_id = 0; uint32_t slot_id; int ret; unsigned n_slots; unsigned lcore_id; unsigned int i; struct lcore_cache *cached_free_slots = NULL; int32_t ret_val; struct rte_hash_bucket *last; short_sig = get_short_sig(sig); /* 得到 hash 值的高16位 */ prim_bucket_idx = get_prim_bucket_index(h, sig); /* 得到主bucket的索引 */ sec_bucket_idx = get_alt_bucket_index(h, prim_bucket_idx, short_sig); /* 得到次bucket的索引 */ prim_bkt = &h->buckets[prim_bucket_idx]; /* 得到主bucket */ sec_bkt = &h->buckets[sec_bucket_idx]; /* 得到次bucket */ rte_prefetch0(prim_bkt); /* 将两个entry 加载到内存 */ rte_prefetch0(sec_bkt); /* 检查 key 是否已经插入到了主位置 */ __hash_rw_writer_lock(h); ret = search_and_update(h, data, key, prim_bkt, short_sig); if (ret != -1) { /* 找到了该key,更新数据,并返回位置 */ __hash_rw_writer_unlock(h); return ret; } /* 主位置没有,检查 key 是否已经插入到次位置 */ FOR_EACH_BUCKET(cur_bkt, sec_bkt) { ret = search_and_update(h, data, key, cur_bkt, short_sig); if (ret != -1) { /* 在次位置找到了该 key,更新数据,并返回位置 */ __hash_rw_writer_unlock(h); return ret; } } __hash_rw_writer_unlock(h); /* 主bucket和次bucket都没有找到匹配,则获取一个新的slot用来保存新的key */ if (h->use_local_cache) { /* 如果使用了本地cache,则从本地cache中获取 */ lcore_id = rte_lcore_id(); cached_free_slots = &h->local_free_slots[lcore_id]; /* 从本地cache中获取一个空闲 slot */ if (cached_free_slots->len == 0) { /* 本地cache中没有空间了 */ /* 需要从全局ring中获取一组空闲slots */ n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots, cached_free_slots->objs, sizeof(uint32_t), LCORE_CACHE_SIZE, NULL); if (n_slots == 0) { return -ENOSPC; } cached_free_slots->len += n_slots; } /* 从本地cache获取一个空闲slot */ cached_free_slots->len--; slot_id = cached_free_slots->objs[cached_free_slots->len]; } else { /* 如果没有使用本地cache,由直接从 free_slots 中获取一个空闲的key空间 */ /* 该 slot_id 即表示在 h->key_stores中的key的索引 */ if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id, sizeof(uint32_t)) != 0) { return -ENOSPC; } } /* 通过得到的空闲 key 的索引,得到对应的 key 的存储空间 */ new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size); /* The store to application data (by the application) at *data should * not leak after the store of pdata in the key store. i.e. pdata is * the guard variable. Release the application data to the readers. */ __atomic_store_n(&new_k->pdata, data, __ATOMIC_RELEASE); /* 拷贝 key (即五元组)到存储空间 */ memcpy(new_k->key, key, h->key_len); /* 该函数先后从主bucket和次bucket及其扩展bucket同中查找,看是否该key已经存在。 * 如果存在,则更新该key的数据,得到对应的位置信息保存到ret_val,并返回1; * 如果不存在,且如果主bucket中有空闲slot,则将其插入主bucket,并回0; * 如果主次bucket中都不存在,且主bucket中没有空闲slot,则返回 -1。 */ ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data, short_sig, slot_id, &ret_val); if (ret == 0) /* 返回 0 表示数据插入成功 */ return slot_id - 1; /* 返回所在的索引 */ else if (ret == 1) { /* 返回 1 表示该 key 在bucket中已存在,把索引再释放回空闲队列 */ enqueue_slot_back(h, cached_free_slots, slot_id); return ret_val; } /* 执行到这里,说明该key在主次bucket中均不存在,且主bucket已经满了, * 需要为新entry创建空间。这里执行线程安全(函数内部通过锁实现)的 Cuckoo 算法分配空间 */ ret = rte_hash_cuckoo_make_space_mw(h, prim_bkt, sec_bkt, key, data, short_sig, prim_bucket_idx, slot_id, &ret_val); if (ret == 0) /* 在主bucket中插入成功,返回索引 id */ return slot_id - 1; else if (ret == 1) { /* bucket中已经存在该key,将空闲索引释放回空闲队列中 */ enqueue_slot_back(h, cached_free_slots, slot_id); return ret_val; } /* 在主bucket中创建失败,在次bucket中创建 */ ret = rte_hash_cuckoo_make_space_mw(h, sec_bkt, prim_bkt, key, data, short_sig, sec_bucket_idx, slot_id, &ret_val); if (ret == 0) /* 在主bucket中插入成功,返回索引 id */ return slot_id - 1; else if (ret == 1) { /* bucket中已经存在该key,将空闲索引释放回空闲队列中 */ enqueue_slot_back(h, cached_free_slots, slot_id); return ret_val; } /* 到了这一步还是没有插入成功,且没有启用扩展表,就表示插入失败了 * 把前面得到的空闲slot释放回空闲slot列表 */ if (!h->ext_table_support) { enqueue_slot_back(h, cached_free_slots, slot_id); return ret; } /* 如果使用了扩展表,则需要遍历扩展bucket。这里需要锁,以保护所有使用扩展bucket的进程 */ __hash_rw_writer_lock(h); /* 再次在主bucket和次级bucket中查找,以防在上锁之前的插入 */ ret = search_and_update(h, data, key, prim_bkt, short_sig); if (ret != -1) { /* 如果返回值为-1,表示在上锁之前确实在主bucket中插入了该key */ enqueue_slot_back(h, cached_free_slots, slot_id); goto failure; } FOR_EACH_BUCKET(cur_bkt, sec_bkt) { /* 尝试向次级bucket及扩展bucket中插入key */ ret = search_and_update(h, data, key, cur_bkt, short_sig); if (ret != -1) { /* 如果返回值为-1,表示在上锁之前确实向次或其扩展bucket插入了该key */ enqueue_slot_back(h, cached_free_slots, slot_id); goto failure; } } /* 前面上锁之前其他进程没有插入该key,搜索次级bucket和扩展bucket,以查找空闲entry插入key */ FOR_EACH_BUCKET(cur_bkt, sec_bkt) { for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) { /* 检查slot是否有效 */ if (likely(cur_bkt->key_idx[i] == EMPTY_SLOT)) { cur_bkt->sig_current[i] = short_sig; /* Store to signature and key should not * leak after the store to key_idx. i.e. * key_idx is the guard variable for signature and key. */ __atomic_store_n(&cur_bkt->key_idx[i], slot_id, __ATOMIC_RELEASE); __hash_rw_writer_unlock(h); return slot_id - 1; } } } /* 从扩展buckets没有找到空闲entry。链接到一个新的扩展bucket。先从ring中获取一个空闲bucket。*/ if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id, sizeof(uint32_t)) != 0 || ext_bkt_id == 0) { ret = -ENOSPC; goto failure; } /* 使用新bucket的第一个位置 */ (h->buckets_ext[ext_bkt_id - 1]).sig_current[0] = short_sig; /* Store to signature and key should not leak after * the store to key_idx. i.e. key_idx is the guard variable * for signature and key. */ __atomic_store_n(&(h->buckets_ext[ext_bkt_id - 1]).key_idx[0], slot_id, __ATOMIC_RELEASE); /* 将新的扩展 bucket 链接到次级 bucket 连接链表 */ last = rte_hash_get_last_bkt(sec_bkt); last->next = &h->buckets_ext[ext_bkt_id - 1]; __hash_rw_writer_unlock(h); return slot_id - 1; failure: __hash_rw_writer_unlock(h); return ret; } |
3.2 search_and_update() 函数
该函数根据key从bucket中查找对应的entry,并更新它的data。
在调用该函数时,调用者需要自己加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | static inline int32_t search_and_update(const struct rte_hash *h, void *data, const void *key, struct rte_hash_bucket *bkt, uint16_t sig) { int i; struct rte_hash_key *k, *keys = h->key_store; for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) { /* 遍历该bucket下的所有 entry */ if (bkt->sig_current[i] == sig) { /* 找到 hash 值相等的项 */ k = (struct rte_hash_key *) ((char *)keys + bkt->key_idx[i] * h->key_entry_size); if (rte_hash_cmp_eq(key, k->key, h) == 0) { /* 如果 key 相等,表示 bucket 中已存在 */ /* The store (to application data at *data ) * should not leak after the )store to pdata * in the key store). i.e. pdata is the guard * variable. Release the application data * to the readers. */ __atomic_store_n(&k->pdata, data, __ATOMIC_RELEASE); /* 返回 key 保存的 entry 的索引,由于第一个空间是保留空间,key是从 第 1 个开始只在的,所以这里需要减去1 */ return bkt->key_idx[i] - 1; } } } return -1; } |
3.3 rte_hash_cuckoo_insert_mw() 函数
该函数用来从主次bucket中查找一个空闲的slot并将数据插入。
如果bucket中已经存在且key值完全一致,则返回1;如果插入成功,则返回0;
如果没有空闲slot,则返回 -1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | static inline int32_t rte_hash_cuckoo_insert_mw(const struct rte_hash *h, struct rte_hash_bucket *prim_bkt, struct rte_hash_bucket *sec_bkt, const struct rte_hash_key *key, void *data, uint16_t sig, uint32_t new_idx, int32_t *ret_val) { unsigned int i; struct rte_hash_bucket *cur_bkt; int32_t ret; __hash_rw_writer_lock(h); /* 在主bucket中检查 key在上次检查之后是否已经被插入。*/ ret = search_and_update(h, data, key, prim_bkt, sig); if (ret != -1) { __hash_rw_writer_unlock(h); *ret_val = ret; return 1; } /* 判断次级bucket及扩展bucket中是否已经插入。只有使用了扩展bucket,次级bucket的next变量才会指向这些扩展bucket */ FOR_EACH_BUCKET(cur_bkt, sec_bkt) { ret = search_and_update(h, data, key, cur_bkt, sig); if (ret != -1) { __hash_rw_writer_unlock(h); *ret_val = ret; return 1; } } /* 如果主bucket中有空间,则插入新的entry */ for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) { /* 如果slot是空闲的 */ if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) { prim_bkt->sig_current[i] = sig; /* 保存hash值 */ /* Store to signature and key should not * leak after the store to key_idx. i.e. * key_idx is the guard variable for signature * and key. 保存索引 */ __atomic_store_n(&prim_bkt->key_idx[i], new_idx, __ATOMIC_RELEASE); break; } } __hash_rw_writer_unlock(h); if (i != RTE_HASH_BUCKET_ENTRIES) /* 说明在8个entry 中找到了空闲slot,否则 i 的值等于8 */ return 0; /* 没有找到空闲 entry */ return -1; } |
3.4 enqueue_slot_back() 函数
调用该函数将索引重新入队到cache/ring,因为slot没被使用,且可以留作后面使用。
1 2 3 4 5 6 7 8 9 | static inline void enqueue_slot_back(const struct rte_hash *h, struct lcore_cache *cached_free_slots, uint32_t slot_id) { if (h->use_local_cache) { /* 如果使用了本地cache,就放回到本地cache */ cached_free_slots->objs[cached_free_slots->len] = slot_id; cached_free_slots->len++; } else /* 如果没有使用本地cache,就将其重新入队到hash表的 free_slots */ rte_ring_sp_enqueue_elem(h->free_slots, &slot_id, sizeof(uint32_t)); } |
3.5 rte_hash_get_last_bkt() 函数
该函数获取 lst_bkt 连接到的最后一个扩展bucket。
1 2 3 4 5 6 | static inline struct rte_hash_bucket *rte_hash_get_last_bkt(struct rte_hash_bucket *lst_bkt) { while (lst_bkt->next != NULL) lst_bkt = lst_bkt->next; return lst_bkt; } |
————————————————————
原创文章,转载请注明: 转载自孙希栋的博客