From: David Teigland The first lock taken on an rsb is treated specially because the resource master needs to be looked up. There were some potential problems with this during recovery, and the whole thing was becoming too complex. This simplifies the special first-lock case and solves the problems. Signed-off-by: David Teigland Signed-off-by: Andrew Morton --- drivers/dlm/debug_fs.c | 9 ++ drivers/dlm/dlm_internal.h | 5 - drivers/dlm/lock.c | 146 +++++++++++++++++---------------------------- 3 files changed, 65 insertions(+), 95 deletions(-) diff -puN drivers/dlm/debug_fs.c~dlm-better-handling-of-first-lock drivers/dlm/debug_fs.c --- devel/drivers/dlm/debug_fs.c~dlm-better-handling-of-first-lock 2005-07-16 13:36:00.000000000 -0700 +++ devel-akpm/drivers/dlm/debug_fs.c 2005-07-16 13:36:00.000000000 -0700 @@ -99,11 +99,16 @@ static int print_resource(struct dlm_rsb else seq_printf(s, "%c", '.'); } - if (res->res_nodeid) + if (res->res_nodeid > 0) seq_printf(s, "\" \nLocal Copy, Master is node %d\n", res->res_nodeid); - else + else if (res->res_nodeid == 0) seq_printf(s, "\" \nMaster Copy\n"); + else if (res->res_nodeid == -1) + seq_printf(s, "\" \nLooking up master (lkid %x)\n", + res->res_first_lkid); + else + seq_printf(s, "\" \nInvalid master %d\n", res->res_nodeid); /* Print the LVB: */ if (res->res_lvbptr) { diff -puN drivers/dlm/dlm_internal.h~dlm-better-handling-of-first-lock drivers/dlm/dlm_internal.h --- devel/drivers/dlm/dlm_internal.h~dlm-better-handling-of-first-lock 2005-07-16 13:36:00.000000000 -0700 +++ devel-akpm/drivers/dlm/dlm_internal.h 2005-07-16 13:36:00.000000000 -0700 @@ -272,8 +272,8 @@ struct dlm_rsb { uint32_t res_lvbseq; uint32_t res_bucket; /* rsbtbl */ unsigned long res_toss_time; - uint32_t res_trial_lkid; /* lkb trying lookup result */ - struct list_head res_lookup; /* lkbs waiting lookup confirm*/ + uint32_t res_first_lkid; + struct list_head res_lookup; /* lkbs waiting on first */ struct list_head res_hashchain; /* rsbtbl */ struct list_head res_grantqueue; struct list_head res_convertqueue; @@ -295,7 +295,6 @@ struct dlm_rsb { /* rsb_flags */ enum rsb_flags { - RSB_MASTER_WAIT, RSB_MASTER_UNCERTAIN, RSB_VALNOTVALID, RSB_VALNOTVALID_PREV, diff -puN drivers/dlm/lock.c~dlm-better-handling-of-first-lock drivers/dlm/lock.c --- devel/drivers/dlm/lock.c~dlm-better-handling-of-first-lock 2005-07-16 13:36:00.000000000 -0700 +++ devel-akpm/drivers/dlm/lock.c 2005-07-16 13:36:00.000000000 -0700 @@ -112,6 +112,7 @@ static const int __dlm_compat_matrix[8][ * 0 = LVB is written to the resource * -1 = nothing happens to the LVB */ + const int dlm_lvb_operations[8][8] = { /* UN NL CR CW PR PW EX PD*/ { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */ @@ -162,8 +163,8 @@ void dlm_print_lkb(struct dlm_lkb *lkb) void dlm_print_rsb(struct dlm_rsb *r) { - printk(KERN_ERR "rsb: nodeid %d flags %lx trial %x rlc %d name %s\n", - r->res_nodeid, r->res_flags, r->res_trial_lkid, + printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", + r->res_nodeid, r->res_flags, r->res_first_lkid, r->res_recover_locks_count, r->res_name); } @@ -317,16 +318,13 @@ static int _search_rsb(struct dlm_ls *ls list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list); if (r->res_nodeid == -1) { - rsb_clear_flag(r, RSB_MASTER_WAIT); rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); - r->res_trial_lkid = 0; + r->res_first_lkid = 0; } else if (r->res_nodeid > 0) { - rsb_clear_flag(r, RSB_MASTER_WAIT); rsb_set_flag(r, RSB_MASTER_UNCERTAIN); - r->res_trial_lkid = 0; + r->res_first_lkid = 0; } else { DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r);); - DLM_ASSERT(!rsb_flag(r, RSB_MASTER_WAIT), dlm_print_rsb(r);); DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),); } out: @@ -1398,19 +1396,10 @@ static void send_blocking_asts_all(struc lookup reply. Other lkb's waiting for the same rsb lookup are kept on the rsb's res_lookup list until the master is verified. - After a remote lookup or when a tossed rsb is retrived that specifies - a remote master, that master value is uncertain -- it may have changed - by the time we send it a request. While it's uncertain, only one lkb - is allowed to go ahead and use the master value; that lkb is specified - by res_trial_lkid. Once the trial lkb is queued on the master node - we know the rsb master is correct and any other lkbs on res_lookup - can get the rsb nodeid and go ahead with their request. - Return values: 0: nodeid is set in rsb/lkb and the caller should go ahead and use it 1: the rsb master is not available and the lkb has been placed on a wait queue - -EXXX: there was some error in processing */ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -1420,42 +1409,32 @@ static int set_master(struct dlm_rsb *r, if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); - rsb_set_flag(r, RSB_MASTER_WAIT); - r->res_trial_lkid = lkb->lkb_id; + r->res_first_lkid = lkb->lkb_id; lkb->lkb_nodeid = r->res_nodeid; return 0; } - if (r->res_nodeid == 0) { - lkb->lkb_nodeid = 0; - return 0; + if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) { + list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup); + return 1; } - if (r->res_trial_lkid == lkb->lkb_id) { - DLM_ASSERT(lkb->lkb_id, dlm_print_lkb(lkb);); - lkb->lkb_nodeid = r->res_nodeid; + if (r->res_nodeid == 0) { + lkb->lkb_nodeid = 0; return 0; } - if (rsb_flag(r, RSB_MASTER_WAIT)) { - list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup); - return 1; - } - if (r->res_nodeid > 0) { lkb->lkb_nodeid = r->res_nodeid; return 0; } - /* This is the first lkb requested on this rsb since the rsb - was created. We need to figure out who the rsb master is. */ - - DLM_ASSERT(r->res_nodeid == -1, ); + DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r);); dir_nodeid = dlm_dir_nodeid(r); if (dir_nodeid != our_nodeid) { - rsb_set_flag(r, RSB_MASTER_WAIT); + r->res_first_lkid = lkb->lkb_id; send_lookup(r, lkb); return 1; } @@ -1476,75 +1455,58 @@ static int set_master(struct dlm_rsb *r, } if (ret_nodeid == our_nodeid) { + r->res_first_lkid = 0; r->res_nodeid = 0; lkb->lkb_nodeid = 0; - return 0; + } else { + r->res_first_lkid = lkb->lkb_id; + r->res_nodeid = ret_nodeid; + lkb->lkb_nodeid = ret_nodeid; } - - rsb_set_flag(r, RSB_MASTER_WAIT); - r->res_trial_lkid = lkb->lkb_id; - r->res_nodeid = ret_nodeid; - lkb->lkb_nodeid = ret_nodeid; return 0; } -/* confirm_master -- confirm (or deny) an rsb's master nodeid +static void process_lookup_list(struct dlm_rsb *r) +{ + struct dlm_lkb *lkb, *safe; - This is called when we get a request reply from a remote node - who we believe is the master. The return value (error) we got - back indicates whether it's really the master or not. If it - wasn't, we need to start over and do another master lookup. If - it was and our lock was queued, then we know the master won't - change. If it was and our lock wasn't queued, we need to do - another trial with the next lkb. -*/ + list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { + list_del(&lkb->lkb_rsb_lookup); + _request_lock(r, lkb); + schedule(); + } +} + +/* confirm_master -- confirm (or deny) an rsb's master nodeid */ static void confirm_master(struct dlm_rsb *r, int error) { - struct dlm_lkb *lkb, *safe; + struct dlm_lkb *lkb; - if (!rsb_flag(r, RSB_MASTER_WAIT)) + if (!r->res_first_lkid) return; switch (error) { case 0: case -EINPROGRESS: - /* the remote master queued our request, or - the remote dir node told us we're the master */ - - rsb_clear_flag(r, RSB_MASTER_WAIT); - r->res_trial_lkid = 0; - - list_for_each_entry_safe(lkb, safe, &r->res_lookup, - lkb_rsb_lookup) { - list_del(&lkb->lkb_rsb_lookup); - _request_lock(r, lkb); - schedule(); - } + r->res_first_lkid = 0; + process_lookup_list(r); break; case -EAGAIN: /* the remote master didn't queue our NOQUEUE request; - do another trial with the next waiting lkb */ + make a waiting lkb the first_lkid */ + + r->res_first_lkid = 0; if (!list_empty(&r->res_lookup)) { lkb = list_entry(r->res_lookup.next, struct dlm_lkb, lkb_rsb_lookup); list_del(&lkb->lkb_rsb_lookup); - r->res_trial_lkid = lkb->lkb_id; + r->res_first_lkid = lkb->lkb_id; _request_lock(r, lkb); - break; - } - /* fall through so the rsb looks new */ - - case -ENOENT: - case -ENOTBLK: - /* the remote master wasn't really the master, i.e. our - trial failed; so we start over with another lookup */ - - r->res_nodeid = -1; - r->res_trial_lkid = 0; - rsb_clear_flag(r, RSB_MASTER_WAIT); + } else + r->res_nodeid = -1; break; default: @@ -2799,7 +2761,6 @@ static void receive_request_reply(struct if (mstype == DLM_MSG_LOOKUP) { r->res_nodeid = ms->m_header.h_nodeid; lkb->lkb_nodeid = r->res_nodeid; - r->res_trial_lkid = lkb->lkb_id; } switch (error) { @@ -2829,13 +2790,7 @@ static void receive_request_reply(struct case -ENOENT: case -ENOTBLK: /* find_rsb failed to find rsb or rsb wasn't master */ - - DLM_ASSERT(rsb_flag(r, RSB_MASTER_WAIT), - log_print("receive_request_reply error %d", error); - dlm_print_lkb(lkb); - dlm_print_rsb(r);); - - confirm_master(r, error); + r->res_nodeid = -1; lkb->lkb_nodeid = -1; _request_lock(r, lkb); break; @@ -3038,17 +2993,19 @@ static void receive_lookup_reply(struct lock_rsb(r); ret_nodeid = ms->m_nodeid; - if (ret_nodeid == dlm_our_nodeid()) - r->res_nodeid = ret_nodeid = 0; - else { + if (ret_nodeid == dlm_our_nodeid()) { + r->res_nodeid = 0; + ret_nodeid = 0; + r->res_first_lkid = 0; + } else { + /* set_master() will copy res_nodeid to lkb_nodeid */ r->res_nodeid = ret_nodeid; - r->res_trial_lkid = lkb->lkb_id; } _request_lock(r, lkb); if (!ret_nodeid) - confirm_master(r, 0); + process_lookup_list(r); unlock_rsb(r); put_rsb(r); @@ -3311,6 +3268,15 @@ int dlm_recover_waiters_post(struct dlm_ switch (mstype) { case DLM_MSG_LOOKUP: + hold_rsb(r); + lock_rsb(r); + _request_lock(r, lkb); + if (is_master(r)) + confirm_master(r, 0); + unlock_rsb(r); + put_rsb(r); + break; + case DLM_MSG_REQUEST: hold_rsb(r); lock_rsb(r); _