Patch from Maneesh Soni , Dipankar Sarma and probably others. This patch provides dcache_lock free d_lookup() using RCU. Al pointed races with d_move and lockfree d_lookup() while concurrent rename is going on. We tested this with a test doing million renames each in 50 threads on 50 different ramfs filesystems. And simultaneously running millions of "ls". The tests were done on 4-way SMP box. 1. Lookup going to a different bucket as the current dentry is moved to a different bucket due to rename. This is solved by having a list_head pointer in the dentry structure which points to the bucket head it belongs. The bucket pointer is updated when the dentry is added to the hash chain. Lookup checks if the current dentry belongs to a different bucket, the cached lookup is failed and real lookup will be done. This condition occured nearly about 100 times during the heavy_rename test. 2. Lookup has got the dentry it is looking and it is comparing various keys and meanwhile a rename operation moves the dentry. This is solved by using a per dentry counter (d_move_count) which is updated at the end of d_move. Lookup takes a snapshot of the d_move_count before comparing the keys and once the comparision succeeds, it takes the per dentry lock to check the d_move_count again. If move_count differs, then dentry is moved (or renamed) and the lookup is failed. 3. There can be a theoritical race when a dentry keeps coming back to original bucket due to double moves. Due to this lookup may consider that it has never moved and can end up in a infinite loop. This is solved by using a loop_counter which is compared with a approximate maximum number of dentries per bucket. This never got hit during the heavy_rename test. 4. There is one more change regarding the loop termintaion condition in d_lookup, now the next hash pointer is compared with the current dentries bucket pointer (is_bucket()). 5. memcmp() in d_lookup() can go out of bounds if name pointer and length fields are not consistent. For this we used a pointer to qstr to keep length and name pointer in one structre. We also tried solving these by using a rwlock but it could not compete with lockless solution. filesystems/Locking | 2 dcache.c | 266 +++++++++++++++++++++++++++++++++++++++------------- linux/dcache.h | 21 ++-- 3 files changed, 219 insertions(+), 70 deletions(-) diff -puN Documentation/filesystems/Locking~dcache_rcu-main Documentation/filesystems/Locking --- 25/Documentation/filesystems/Locking~dcache_rcu-main 2003-02-14 18:26:42.000000000 -0800 +++ 25-akpm/Documentation/filesystems/Locking 2003-02-14 18:26:42.000000000 -0800 @@ -21,7 +21,7 @@ locking rules: dcache_lock may block d_revalidate: no yes d_hash no yes -d_compare: yes no +d_compare: no no d_delete: yes no d_release: no yes d_iput: no yes diff -puN fs/dcache.c~dcache_rcu-main fs/dcache.c --- 25/fs/dcache.c~dcache_rcu-main 2003-02-14 18:26:42.000000000 -0800 +++ 25-akpm/fs/dcache.c 2003-02-14 18:26:42.000000000 -0800 @@ -49,12 +49,30 @@ static unsigned int d_hash_mask; static unsigned int d_hash_shift; static struct list_head *dentry_hashtable; static LIST_HEAD(dentry_unused); +static int max_dentries; +static void * hashtable_end; + +static inline int is_bucket(void * addr) +{ + return ((addr < (void *)dentry_hashtable) + || (addr > hashtable_end) ? 0 : 1); +} /* Statistics gathering. */ struct dentry_stat_t dentry_stat = { .age_limit = 45, }; +static void d_callback(void *arg) +{ + struct dentry * dentry = (struct dentry *)arg; + + if (dname_external(dentry)) { + kfree(dentry->d_qstr); + } + kmem_cache_free(dentry_cache, dentry); +} + /* * no dcache_lock, please. The caller must decrement dentry_stat.nr_dentry * inside dcache_lock. @@ -63,9 +81,7 @@ static void d_free(struct dentry *dentry { if (dentry->d_op && dentry->d_op->d_release) dentry->d_op->d_release(dentry); - if (dname_external(dentry)) - kfree(dentry->d_name.name); - kmem_cache_free(dentry_cache, dentry); + call_rcu(&dentry->d_rcu, d_callback, dentry); } /* @@ -126,9 +142,13 @@ repeat: if (!atomic_dec_and_lock(&dentry->d_count, &dcache_lock)) return; - /* dput on a free dentry? */ - if (!list_empty(&dentry->d_lru)) - BUG(); + spin_lock(&dentry->d_lock); + if (atomic_read(&dentry->d_count)) { + spin_unlock(&dentry->d_lock); + spin_unlock(&dcache_lock); + return; + } + /* * AV: ->d_delete() is _NOT_ allowed to block now. */ @@ -139,8 +159,12 @@ repeat: /* Unreachable? Get rid of it */ if (d_unhashed(dentry)) goto kill_it; - list_add(&dentry->d_lru, &dentry_unused); - dentry_stat.nr_unused++; + if (list_empty(&dentry->d_lru)) { + dentry->d_vfs_flags &= ~DCACHE_REFERENCED; + list_add(&dentry->d_lru, &dentry_unused); + dentry_stat.nr_unused++; + } + spin_unlock(&dentry->d_lock); dentry->d_vfs_flags |= DCACHE_REFERENCED; spin_unlock(&dcache_lock); return; @@ -150,7 +174,16 @@ unhash_it: kill_it: { struct dentry *parent; - list_del(&dentry->d_child); + + /* If dentry was on d_lru list + * delete it from there + */ + if (!list_empty(&dentry->d_lru)) { + list_del(&dentry->d_lru); + dentry_stat.nr_unused--; + } + list_del(&dentry->d_child); + spin_unlock(&dentry->d_lock); dentry_stat.nr_dentry--; /* For d_free, below */ /* drops the lock, at that point nobody can reach this dentry */ dentry_iput(dentry); @@ -222,6 +255,7 @@ int d_invalidate(struct dentry * dentry) static inline struct dentry * __dget_locked(struct dentry *dentry) { atomic_inc(&dentry->d_count); + dentry->d_vfs_flags |= DCACHE_REFERENCED; if (atomic_read(&dentry->d_count) == 1) { dentry_stat.nr_unused--; list_del_init(&dentry->d_lru); @@ -289,8 +323,8 @@ restart: struct dentry *dentry = list_entry(tmp, struct dentry, d_alias); if (!atomic_read(&dentry->d_count)) { __dget_locked(dentry); + __d_drop(dentry); spin_unlock(&dcache_lock); - d_drop(dentry); dput(dentry); goto restart; } @@ -310,6 +344,7 @@ static inline void prune_one_dentry(stru __d_drop(dentry); list_del(&dentry->d_child); + spin_unlock(&dentry->d_lock); dentry_stat.nr_dentry--; /* For d_free, below */ dentry_iput(dentry); parent = dentry->d_parent; @@ -343,18 +378,24 @@ static void prune_dcache(int count) if (tmp == &dentry_unused) break; list_del_init(tmp); + dentry_stat.nr_unused--; dentry = list_entry(tmp, struct dentry, d_lru); + spin_lock(&dentry->d_lock); /* If the dentry was recently referenced, don't free it. */ if (dentry->d_vfs_flags & DCACHE_REFERENCED) { dentry->d_vfs_flags &= ~DCACHE_REFERENCED; - list_add(&dentry->d_lru, &dentry_unused); + + /* don't add non zero d_count dentries + * back to d_lru list + */ + if (!atomic_read(&dentry->d_count)) { + list_add(&dentry->d_lru, &dentry_unused); + dentry_stat.nr_unused++; + } + spin_unlock(&dentry->d_lock); continue; } - dentry_stat.nr_unused--; - - /* Unused dentry with a count? */ - BUG_ON(atomic_read(&dentry->d_count)); prune_one_dentry(dentry); } spin_unlock(&dcache_lock); @@ -414,10 +455,13 @@ repeat: dentry = list_entry(tmp, struct dentry, d_lru); if (dentry->d_sb != sb) continue; - if (atomic_read(&dentry->d_count)) - continue; dentry_stat.nr_unused--; list_del_init(tmp); + spin_lock(&dentry->d_lock); + if (atomic_read(&dentry->d_count)) { + spin_unlock(&dentry->d_lock); + continue; + } prune_one_dentry(dentry); goto repeat; } @@ -497,8 +541,12 @@ resume: struct list_head *tmp = next; struct dentry *dentry = list_entry(tmp, struct dentry, d_child); next = tmp->next; + list_del_init(&dentry->d_lru); + + /* don't add non zero d_count dentries + * back to d_lru list + */ if (!atomic_read(&dentry->d_count)) { - list_del(&dentry->d_lru); list_add(&dentry->d_lru, dentry_unused.prev); found++; } @@ -551,6 +599,9 @@ void shrink_dcache_parent(struct dentry * * Prune the dentries that are anonymous * + * parsing d_hash list does not read_barrier_depends() as it + * done under dcache_lock. + * */ void shrink_dcache_anon(struct list_head *head) { @@ -561,8 +612,12 @@ void shrink_dcache_anon(struct list_head spin_lock(&dcache_lock); list_for_each(lp, head) { struct dentry *this = list_entry(lp, struct dentry, d_hash); + list_del(&this->d_lru); + + /* don't add non zero d_count dentries + * back to d_lru list + */ if (!atomic_read(&this->d_count)) { - list_del(&this->d_lru); list_add_tail(&this->d_lru, &dentry_unused); found++; } @@ -630,28 +685,39 @@ struct dentry * d_alloc(struct dentry * { char * str; struct dentry *dentry; + struct qstr * qstr; dentry = kmem_cache_alloc(dentry_cache, GFP_KERNEL); if (!dentry) return NULL; if (name->len > DNAME_INLINE_LEN-1) { - str = kmalloc(NAME_ALLOC_LEN(name->len), GFP_KERNEL); - if (!str) { + qstr = kmalloc(sizeof(*qstr) + NAME_ALLOC_LEN(name->len), + GFP_KERNEL); + if (!qstr) { kmem_cache_free(dentry_cache, dentry); return NULL; } - } else - str = dentry->d_iname; + qstr->name = qstr->name_str; + qstr->len = name->len; + qstr->hash = name->hash; + dentry->d_qstr = qstr; + str = qstr->name_str; + } else { + dentry->d_qstr = &dentry->d_name; + str = dentry->d_iname; + } memcpy(str, name->name, name->len); str[name->len] = 0; atomic_set(&dentry->d_count, 1); - dentry->d_vfs_flags = 0; + dentry->d_vfs_flags = DCACHE_UNHASHED; + dentry->d_lock = SPIN_LOCK_UNLOCKED; dentry->d_flags = 0; dentry->d_inode = NULL; dentry->d_parent = NULL; + dentry->d_move_count = 0; dentry->d_sb = NULL; dentry->d_name.name = str; dentry->d_name.len = name->len; @@ -660,6 +726,7 @@ struct dentry * d_alloc(struct dentry * dentry->d_fsdata = NULL; dentry->d_mounted = 0; dentry->d_cookie = NULL; + dentry->d_bucket = NULL; INIT_LIST_HEAD(&dentry->d_hash); INIT_LIST_HEAD(&dentry->d_lru); INIT_LIST_HEAD(&dentry->d_subdirs); @@ -785,12 +852,16 @@ struct dentry * d_alloc_anon(struct inod res = tmp; tmp = NULL; if (res) { + spin_lock(&res->d_lock); res->d_sb = inode->i_sb; res->d_parent = res; res->d_inode = inode; + res->d_bucket = d_hash(res, res->d_name.hash); res->d_flags |= DCACHE_DISCONNECTED; + res->d_vfs_flags &= ~DCACHE_UNHASHED; list_add(&res->d_alias, &inode->i_dentry); list_add(&res->d_hash, &inode->i_sb->s_anon); + spin_unlock(&res->d_lock); } inode = NULL; /* don't drop reference */ } @@ -855,50 +926,92 @@ struct dentry *d_splice_alias(struct ino * the dentry is found its reference count is incremented and the dentry * is returned. The caller must use d_put to free the entry when it has * finished using it. %NULL is returned on failure. + * + * d_lookup is now, dcache_lock free. The hash list is protected using RCU. + * Memory barriers are used while updating and doing lockless traversal. + * To avoid races with d_move while rename is happening, d_move_count is + * used. + * + * Overflows in memcmp(), while d_move, are avoided by keeping the length + * and name pointer in one structure pointed by d_qstr. + * + * rcu_read_lock() and rcu_read_unlock() are used to disable preemption while + * lookup is going on. + * + * d_lru list is not updated, which can leave non-zero d_count dentries + * around in d_lru list. */ - -struct dentry * d_lookup(struct dentry * parent, struct qstr * name) -{ - struct dentry * dentry; - spin_lock(&dcache_lock); - dentry = __d_lookup(parent,name); - if (dentry) - __dget_locked(dentry); - spin_unlock(&dcache_lock); - return dentry; -} -struct dentry * __d_lookup(struct dentry * parent, struct qstr * name) +struct dentry * d_lookup(struct dentry * parent, struct qstr * name) { - unsigned int len = name->len; unsigned int hash = name->hash; const unsigned char *str = name->name; struct list_head *head = d_hash(parent,hash); + struct dentry *found = NULL; struct list_head *tmp; + int lookup_count = 0; - tmp = head->next; - for (;;) { - struct dentry * dentry = list_entry(tmp, struct dentry, d_hash); - if (tmp == head) + rcu_read_lock(); + + /* lookup is terminated when flow reaches any bucket head */ + for(tmp = head->next; !is_bucket(tmp); tmp = tmp->next) { + struct dentry *dentry; + unsigned long move_count; + struct qstr * qstr; + + smp_read_barrier_depends(); + dentry = list_entry(tmp, struct dentry, d_hash); + + /* if lookup ends up in a different bucket + * due to concurrent rename, fail it + */ + if (unlikely(dentry->d_bucket != head)) break; - tmp = tmp->next; + + /* to avoid race if dentry keep coming back to original + * bucket due to double moves + */ + if (unlikely(++lookup_count > max_dentries)) + break; + + /* + * We must take a snapshot of d_move_count followed by + * read memory barrier before any search key comparison + */ + move_count = dentry->d_move_count; + smp_rmb(); + if (dentry->d_name.hash != hash) continue; if (dentry->d_parent != parent) continue; + + qstr = dentry->d_qstr; + smp_read_barrier_depends(); if (parent->d_op && parent->d_op->d_compare) { - if (parent->d_op->d_compare(parent, &dentry->d_name, name)) + if (parent->d_op->d_compare(parent, qstr, name)) continue; } else { - if (dentry->d_name.len != len) + if (qstr->len != len) continue; - if (memcmp(dentry->d_name.name, str, len)) + if (memcmp(qstr->name, str, len)) continue; } - return dentry; - } - return NULL; + spin_lock(&dentry->d_lock); + /* + * If dentry is moved, fail the lookup + */ + if (unlikely(move_count != dentry->d_move_count)) + break; + if (!d_unhashed(dentry)) + found = dget(dentry); + spin_unlock(&dentry->d_lock); + break; + } + rcu_read_unlock(); + + return found; } /** @@ -936,8 +1049,11 @@ int d_validate(struct dentry *dentry, st spin_lock(&dcache_lock); lhp = base = d_hash(dparent, dentry->d_name.hash); while ((lhp = lhp->next) != base) { + /* read_barrier_depends() not required for d_hash list + * as it is parsed under dcache_lock + */ if (dentry == list_entry(lhp, struct dentry, d_hash)) { - __dget_locked(dentry); + dget(dentry); spin_unlock(&dcache_lock); return 1; } @@ -974,17 +1090,18 @@ void d_delete(struct dentry * dentry) * Are we the only user? */ spin_lock(&dcache_lock); + spin_lock(&dentry->d_lock); if (atomic_read(&dentry->d_count) == 1) { + spin_unlock(&dentry->d_lock); dentry_iput(dentry); return; } - spin_unlock(&dcache_lock); - /* - * If not, just drop the dentry and let dput - * pick up the tab.. - */ - d_drop(dentry); + if (!d_unhashed(dentry)) + __d_drop(dentry); + + spin_unlock(&dentry->d_lock); + spin_unlock(&dcache_lock); } /** @@ -997,9 +1114,11 @@ void d_delete(struct dentry * dentry) void d_rehash(struct dentry * entry) { struct list_head *list = d_hash(entry->d_parent, entry->d_name.hash); - if (!d_unhashed(entry)) BUG(); spin_lock(&dcache_lock); - list_add(&entry->d_hash, list); + if (!list_empty(&entry->d_hash) && !d_unhashed(entry)) BUG(); + entry->d_vfs_flags &= ~DCACHE_UNHASHED; + entry->d_bucket = list; + list_add_rcu(&entry->d_hash, list); spin_unlock(&dcache_lock); } @@ -1021,16 +1140,25 @@ void d_rehash(struct dentry * entry) static inline void switch_names(struct dentry * dentry, struct dentry * target) { const unsigned char *old_name, *new_name; + struct qstr *old_qstr, *new_qstr; memcpy(dentry->d_iname, target->d_iname, DNAME_INLINE_LEN); + old_qstr = target->d_qstr; old_name = target->d_name.name; + new_qstr = dentry->d_qstr; new_name = dentry->d_name.name; - if (old_name == target->d_iname) + if (old_name == target->d_iname) { old_name = dentry->d_iname; - if (new_name == dentry->d_iname) + old_qstr = &dentry->d_name; + } + if (new_name == dentry->d_iname) { new_name = target->d_iname; + new_qstr = &target->d_name; + } target->d_name.name = new_name; dentry->d_name.name = old_name; + target->d_qstr = new_qstr; + dentry->d_qstr = old_qstr; } /* @@ -1057,15 +1185,21 @@ static inline void switch_names(struct d * Update the dcache to reflect the move of a file name. Negative * dcache entries should not be moved in this way. */ - + void d_move(struct dentry * dentry, struct dentry * target) { if (!dentry->d_inode) printk(KERN_WARNING "VFS: moving negative dcache entry\n"); spin_lock(&dcache_lock); - /* Move the dentry to the target hash queue */ - list_move(&dentry->d_hash, &target->d_hash); + spin_lock(&dentry->d_lock); + + /* Move the dentry to the target hash queue, if on different bucket */ + if (dentry->d_bucket != target->d_bucket) { + dentry->d_bucket = target->d_bucket; + list_del_rcu(&dentry->d_hash); + list_add_rcu(&dentry->d_hash, &target->d_hash); + } /* Unhash the target: dput() will then get rid of it */ __d_drop(target); @@ -1075,6 +1209,7 @@ void d_move(struct dentry * dentry, stru /* Switch the names.. */ switch_names(dentry, target); + smp_wmb(); do_switch(dentry->d_name.len, target->d_name.len); do_switch(dentry->d_name.hash, target->d_name.hash); /* ... and switch the parents */ @@ -1092,6 +1227,8 @@ void d_move(struct dentry * dentry, stru write_unlock(&dparent_lock); list_add(&dentry->d_child, &dentry->d_parent->d_subdirs); + dentry->d_move_count++; + spin_unlock(&dentry->d_lock); spin_unlock(&dcache_lock); } @@ -1384,6 +1521,9 @@ static void __init dcache_init(unsigned if (!dentry_cache) panic("Cannot create dentry cache"); + /* approximate maximum number of dentries in one hash bucket */ + max_dentries = (mempages * (PAGE_SIZE / sizeof(struct dentry))); + set_shrinker(DEFAULT_SEEKS, shrink_dcache_memory); #if PAGE_SHIFT < 13 @@ -1415,6 +1555,8 @@ static void __init dcache_init(unsigned if (!dentry_hashtable) panic("Failed to allocate dcache hash table\n"); + hashtable_end = dentry_hashtable + nr_hash; + d = dentry_hashtable; i = nr_hash; do { diff -puN include/linux/dcache.h~dcache_rcu-main include/linux/dcache.h --- 25/include/linux/dcache.h~dcache_rcu-main 2003-02-14 18:26:42.000000000 -0800 +++ 25-akpm/include/linux/dcache.h 2003-02-14 18:26:42.000000000 -0800 @@ -7,6 +7,7 @@ #include #include #include +#include #include struct vfsmount; @@ -30,6 +31,7 @@ struct qstr { const unsigned char * name; unsigned int len; unsigned int hash; + char name_str[0]; }; struct dentry_stat_t { @@ -72,21 +74,26 @@ struct dcookie_struct; struct dentry { atomic_t d_count; + unsigned long d_vfs_flags; /* moved here to be on same cacheline */ + spinlock_t d_lock; /* per dentry lock */ unsigned int d_flags; + unsigned long d_move_count; /* to indicated moved dentry while lockless lookup */ struct inode * d_inode; /* Where the name belongs to - NULL is negative */ struct dentry * d_parent; /* parent directory */ + struct list_head * d_bucket; /* lookup hash bucket */ struct list_head d_hash; /* lookup hash list */ - struct list_head d_lru; /* d_count = 0 LRU list */ + struct list_head d_lru; /* LRU list */ struct list_head d_child; /* child of parent list */ struct list_head d_subdirs; /* our children */ struct list_head d_alias; /* inode alias list */ int d_mounted; struct qstr d_name; + struct qstr * d_qstr; /* quick str ptr used in lockless lookup and concurrent d_move */ unsigned long d_time; /* used by d_revalidate */ struct dentry_operations *d_op; struct super_block * d_sb; /* The root of the dentry tree */ - unsigned long d_vfs_flags; void * d_fsdata; /* fs-specific data */ + struct rcu_head d_rcu; struct dcookie_struct * d_cookie; /* cookie, if any */ unsigned char d_iname[DNAME_INLINE_LEN_MIN]; /* small names */ } ____cacheline_aligned; @@ -139,6 +146,7 @@ d_iput: no no yes */ #define DCACHE_REFERENCED 0x0008 /* Recently used, don't discard. */ +#define DCACHE_UNHASHED 0x0010 extern spinlock_t dcache_lock; extern rwlock_t dparent_lock; @@ -162,7 +170,8 @@ extern rwlock_t dparent_lock; static __inline__ void __d_drop(struct dentry * dentry) { - list_del_init(&dentry->d_hash); + dentry->d_vfs_flags |= DCACHE_UNHASHED; + list_del_rcu(&dentry->d_hash); } static __inline__ void d_drop(struct dentry * dentry) @@ -229,7 +238,6 @@ extern void d_move(struct dentry *, stru /* appendix may either be NULL or be used for transname suffixes */ extern struct dentry * d_lookup(struct dentry *, struct qstr *); -extern struct dentry * __d_lookup(struct dentry *, struct qstr *); /* validate "insecure" dentry pointer */ extern int d_validate(struct dentry *, struct dentry *); @@ -254,9 +262,8 @@ extern char * d_path(struct dentry *, st static __inline__ struct dentry * dget(struct dentry *dentry) { if (dentry) { - if (!atomic_read(&dentry->d_count)) - BUG(); atomic_inc(&dentry->d_count); + dentry->d_vfs_flags |= DCACHE_REFERENCED; } return dentry; } @@ -272,7 +279,7 @@ extern struct dentry * dget_locked(struc static __inline__ int d_unhashed(struct dentry *dentry) { - return list_empty(&dentry->d_hash); + return (dentry->d_vfs_flags & DCACHE_UNHASHED); } extern void dput(struct dentry *); _