TBLEG
扫描微信账号

扫一扫微信二维码

无锁有序链表的实现

2020-05-16 信息
区块链白皮书代写

感谢同事【kevinlynx】在本站发表此文

无锁有序链表可以保证元素唯一性,使其可用于哈希表桶,甚至直接作为一个效率不那么高map。普通链表无锁实现相对简单点,因为插入元素可以在表头插,而有序链表插入则是任意位置。

本文主要基于论文High Performance Dynamic Lock-Free Hash Tables实现。

 

主要问题

链表主要操作包含insertremove,先简单实现一个版本,就会看到问题所在,以下代码只用作示例:

struct node_t {        key_t key;        value_t val;        node_t *next;    };    int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {        node_t *pred = head;        node_t *item = head->next;        while (item) {            int d = KEY_CMP(item->key, key);            if (d >= 0) {                *pred_ptr = pred;                *item_ptr = item;                return d == 0 ? TRUE : FALSE;            }            pred = item;            item = item->next;        }         *pred_ptr = pred;        *item_ptr = NULL;        return FALSE;    }    int l_insert(node_t *head, key_t key, value_t val) {        node_t *pred, *item, *new_item;        while (TRUE) {            if (l_find(&pred, &item, head, key)) {                return FALSE;            }            new_item = (node_t*) malloc(sizeof(node_t));            new_item->key = key;            new_item->val = val;            new_item->next = item;            // A. 如果pred本身被移除了            if (CAS(&pred->next, item, new_item)) {                return TRUE;            }            free(new_item);        }    }    int l_remove(node_t *head, key_t key) {        node_t *pred, *item;        while (TRUE) {            if (!l_find(&pred, &item, head, key)) {                return TRUE;            }            // B. 如果pred被移除;如果item也被移除            if (CAS(&pred->next, item, item->next)) {                haz_free(item);                return TRUE;            }        }    }

l_find函数返回查找到前序元素和元素本身,代码A和B虽然拿到了preditem,但在CAS时候,其可能被其他线程移除。甚至,在l_find过程中,其每一个元素都可能被移除。问题在于,任何时候拿到一个元素时,都不确定其是否还有效。元素有效性包括其是否还在链表中,其指向内存是否还有效。

解决方案

通过为元素指针增加一个有效性标志位,配合CAS操作互斥性,就可以解决元素有效性判定问题。

因为node_t放在内存中是会对齐,所以指向node_t指针值低几位是不会用到,从而可以在低几位里设置标志,这样在做CAS时候,就实现了DCAS效果,相当于将两个逻辑上操作变成了一个原子操作。想象下引用计数对象线程安全性,其内包装指针是线程安全,但对象本身不是。

CAS互斥性,在若干个线程CAS相同对象时,只有一个线程会成功,失败线程就可以以此判定目标对象发生了变更。改进后代码(代码仅做示例用,不保证正确):

typedef size_t markable_t;    // 最低位置1,表示元素被删除    #define HAS_MARK(p) ((markable_t)p & 0x01)    #define MARK(p) ((markable_t)p | 0x01)    #define STRIP_MARK(p) ((markable_t)p & ~0x01)    int l_insert(node_t *head, key_t key, value_t val) {        node_t *pred, *item, *new_item;        while (TRUE) {            if (l_find(&pred, &item, head, key)) {                 return FALSE;            }            new_item = (node_t*) malloc(sizeof(node_t));            new_item->key = key;            new_item->val = val;            new_item->next = item;            // A. 虽然find拿到了合法pred,但是在以下代码之前pred可能被删除,此时pred->next被标记            //    pred->next != item,该CAS会失败,失败后重试            if (CAS(&pred->next, item, new_item)) {                return TRUE;            }            free(new_item);        }        return FALSE;    }    int l_remove(node_t *head, key_t key) {        node_t *pred, *item;        while (TRUE) {            if (!l_find(&pred, &item, head, key)) {                return FALSE;            }            node_t *inext = item->next;            // B. 删除item前先标记item->next,如果CAS失败,那么情况同insert一样,有其他线程在find之后            //    删除了item,失败后重试            if (!CAS(&item->next, inext, MARK(inext))) {                continue;            }            // C. 对同一个元素item删除时,只会有一个线程成功走到这里            if (CAS(&pred->next, item, STRIP_MARK(item->next))) {                haz_defer_free(item);                return TRUE;            }        }        return FALSE;    }    int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {        node_t *pred = head;        node_t *item = head->next;        hazard_t *hp1 = haz_get(0);        hazard_t *hp2 = haz_get(1);        while (item) {            haz_set_ptr(hp1, pred);            haz_set_ptr(hp2, item);            /*              如果已被标记,那么紧接着item可能被移除链表甚至释放,所以需要重头查找            */            if (HAS_MARK(item->next)) {                 return l_find(pred_ptr, item_ptr, head, key);            }            int d = KEY_CMP(item->key, key);            if (d >= 0) {                *pred_ptr = pred;                *item_ptr = item;                return d == 0 ? TRUE : FALSE;            }            pred = item;            item = item->
全文阅读
文章关键词
扫描关注微信账号

试试长按二维码加关注