diff options
author | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2020-07-15 22:52:41 -0400 |
---|---|---|
committer | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2020-07-15 22:56:57 -0400 |
commit | ab9ff0af851e926ebdf4fedda101539b5171db1d (patch) | |
tree | a9c451497c3b9bb5f7ccf3c3c7b758566cb93241 | |
parent | d375361fa6f6cf7e83130d17bc2f5d32f740bb66 (diff) | |
download | grokmirror-ab9ff0af851e926ebdf4fedda101539b5171db1d.tar.gz |
Move objstore and repack ops into single thread
I noticed that we thrash IO a lot when doing objstore and repack
operations for each repo that we fetch inside pull threads. This
introduces a separate "spa" process where these operations are done
lazily, allowing pull threads to move on to next repos.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r-- | grokmirror/__init__.py | 2 | ||||
-rwxr-xr-x | grokmirror/pull.py | 170 |
2 files changed, 122 insertions, 50 deletions
diff --git a/grokmirror/__init__.py b/grokmirror/__init__.py index a374ab8..7cc2d48 100644 --- a/grokmirror/__init__.py +++ b/grokmirror/__init__.py @@ -211,7 +211,7 @@ def set_repo_timestamp(toplevel, gitdir, ts): def get_repo_obj_info(fullpath): args = ['count-objects', '-v'] retcode, output, error = run_git_command(fullpath, args) - obj_info = {} + obj_info = dict() if output: for line in output.split('\n'): diff --git a/grokmirror/pull.py b/grokmirror/pull.py index 4439759..62fea85 100755 --- a/grokmirror/pull.py +++ b/grokmirror/pull.py @@ -44,9 +44,10 @@ logger = logging.getLogger(__name__) class SignalHandler: - def __init__(self, config, sw, pws, done): + def __init__(self, config, sw, dws, pws, done): self.config = config self.sw = sw + self.dws = dws self.pws = pws self.done = done self.killed = False @@ -54,11 +55,17 @@ class SignalHandler: def _handler(self, signum, frame): self.killed = True logger.debug('Received signum=%s, frame=%s', signum, frame) - self.sw.terminate() - self.sw.join() + if self.sw: + self.sw.terminate() + self.sw.join() + + for dw in self.dws: + if dw and dw.is_alive(): + dw.terminate() + dw.join() for pw in self.pws: - if pw: + if pw and pw.is_alive(): pw.terminate() pw.join() @@ -164,7 +171,56 @@ def build_optimal_forkgroups(l_manifest, r_manifest, toplevel, obstdir): return forkgroups -def pull_worker(config, q_pull, q_done): +def spa_worker(config, q_diet): + toplevel = os.path.realpath(config['core'].get('toplevel')) + while True: + try: + (gitdir, actions) = q_diet.get(timeout=1) + except queue.Empty: + return + + logger.debug('spa_worker: gitdir=%s, actions=%s', gitdir, actions) + fullpath = os.path.join(toplevel, gitdir.lstrip('/')) + try: + grokmirror.lock_repo(fullpath, nonblocking=True) + except IOError: + # We'll get it during grok-fsck + continue + + logger.info(' in-spa: %s', gitdir) + done = list() + for action in actions: + if action in done: + continue + done.append(action) + if action == 'objstore': + altrepo = grokmirror.get_altrepo(fullpath) + grokmirror.fetch_objstore_repo(altrepo, fullpath) + + elif action == 'repack': + logger.debug('quick-repacking %s', fullpath) + args = ['repack', '-Adlq'] + ecode, out, err = grokmirror.run_git_command(fullpath, args) + if ecode > 0: + logger.debug('Could not repack %s', fullpath) + + elif action == 'packrefs': + args = ['pack-refs'] + ecode, out, err = grokmirror.run_git_command(fullpath, args) + if ecode > 0: + logger.debug('Could not pack-refs %s', fullpath) + + elif action == 'packrefs-all': + args = ['pack-refs', '--all'] + ecode, out, err = grokmirror.run_git_command(fullpath, args) + if ecode > 0: + logger.debug('Could not pack-refs %s', fullpath) + + grokmirror.unlock_repo(fullpath) + logger.info(' dieted: %s (%s)', gitdir, ', '.join(done)) + + +def pull_worker(config, q_pull, q_diet, q_done): toplevel = os.path.realpath(config['core'].get('toplevel')) obstdir = os.path.realpath(config['core'].get('objstore')) maxretries = config['pull'].getint('retries', 3) @@ -176,10 +232,10 @@ def pull_worker(config, q_pull, q_done): except queue.Empty: return - logger.debug('gitdir=%s, action=%s', gitdir, action) + logger.debug('pull_worker: gitdir=%s, action=%s', gitdir, action) fullpath = os.path.join(toplevel, gitdir.lstrip('/')) success = True - repack = False + diet_actions = list() try: grokmirror.lock_repo(fullpath, nonblocking=True) @@ -251,24 +307,29 @@ def pull_worker(config, q_pull, q_done): repoinfo['fingerprint'] = post_pull_fp if post_pull_fp != my_fp: grokmirror.set_repo_fingerprint(toplevel, gitdir, fingerprint=post_pull_fp) - action = 'objstore' + altrepo = grokmirror.get_altrepo(fullpath) + if altrepo and grokmirror.is_obstrepo(altrepo, obstdir) and not repoinfo.get('private'): + # do we have any objects in the objstore repo? + o_obj_info = grokmirror.get_repo_obj_info(altrepo) + if o_obj_info.get('count') == '0': + # We fetch right now, as other repos may be waiting on these objects + logger.info(' objstore: %s', gitdir) + grokmirror.fetch_objstore_repo(altrepo, fullpath) + else: + # We lazy-fetch in the spa + diet_actions.append('objstore') + diet_actions.append('repack') if my_fp is None: # This was the initial clone, so pack all refs - logger.info(' packrefs: %s', gitdir) - args = ['pack-refs', '--all'] - ecode, out, err = grokmirror.run_git_command(fullpath, args) - if ecode > 0: - # We don't really care if it fails, though it really shouldn't - logger.debug('Could not pack-refs in %s', fullpath) - # Run a bigger repack if objstore is involved - repack = True + diet_actions.append('packrefs-all') else: + diet_actions.append('packrefs') if not grokmirror.is_precious(fullpath): # See if doing a quick repack would be beneficial obj_info = grokmirror.get_repo_obj_info(fullpath) if grokmirror.get_repack_level(obj_info): # We only do quick repacks, so we don't care about precise level - repack = True + diet_actions.append('repack') modified = repoinfo.get('modified') if modified is not None: @@ -277,30 +338,8 @@ def pull_worker(config, q_pull, q_done): logger.debug('FP match, not pulling %s', gitdir) if action == 'objstore_migrate': - repack = True - action = 'objstore' - - if action == 'objstore': - altrepo = grokmirror.get_altrepo(fullpath) - if altrepo and grokmirror.is_obstrepo(altrepo, obstdir) and not repoinfo.get('private'): - logger.info(' objstore: %s', gitdir) - grokmirror.fetch_objstore_repo(altrepo, fullpath) - if repack: - action = 'repack' - - if action == 'repack': - logger.debug('quick-repacking %s', fullpath) - args = ['repack', '-Adlq'] - logger.info(' repack: %s', gitdir) - ecode, out, err = grokmirror.run_git_command(fullpath, args) - if ecode > 0: - logger.debug('Could not repack %s', fullpath) - else: - logger.info(' packrefs: %s', gitdir) - args = ['pack-refs'] - ecode, out, err = grokmirror.run_git_command(fullpath, args) - if ecode > 0: - logger.debug('Could not pack-refs %s', fullpath) + diet_actions.append('objstore') + diet_actions.append('repack') symlinks = repoinfo.get('symlinks') if os.path.exists(fullpath) and symlinks: @@ -327,6 +366,8 @@ def pull_worker(config, q_pull, q_done): grokmirror.unlock_repo(fullpath) q_done.put((gitdir, repoinfo, q_action, success)) + if diet_actions: + q_diet.put((gitdir, diet_actions)) def cull_manifest(manifest, config): @@ -875,6 +916,24 @@ def socket_worker(config, q_todo, sockfile): server.serve_forever() +def showstats(q_todo, q_pull, q_diet, good, bad, pws, dws): + stats = list() + if good: + stats.append('%s done' % good) + if pws: + stats.append('%s active' % len(pws)) + if not q_pull.empty(): + stats.append('%s queued' % q_pull.qsize()) + if not q_todo.empty(): + stats.append('%s waiting' % q_todo.qsize()) + if not q_diet.empty(): + stats.append('%s dieting' % (q_diet.qsize() + len(dws))) + if bad: + stats.append('%s failed' % bad) + + logger.info(' ---: %s', ', '.join(stats)) + + def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce=False): global logger @@ -915,6 +974,7 @@ def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce= q_todo = mp.Queue() q_pull = mp.Queue() q_done = mp.Queue() + q_diet = mp.Queue() sw = None sockfile = config['pull'].get('socket') @@ -930,6 +990,7 @@ def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce= sw.start() pws = list() + dws = list() actions = set() new_actions = fill_todo_from_manifest(config, actions, nomtime=nomtime, forcepurge=forcepurge) if not len(new_actions) and runonce: @@ -952,16 +1013,28 @@ def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce= loopmark = None saidsleeping = False lastrun = time.time() - with SignalHandler(config, sw, pws, done): + with SignalHandler(config, sw, dws, pws, done): while True: for pw in pws: - if not pw.is_alive(): + if pw and not pw.is_alive(): pws.remove(pw) logger.info(' worker: terminated (%s remaining)', len(pws)) - logger.info(' ---: %s done, %s active, %s queued, %s waiting, %s failed', - good, len(pws), q_pull.qsize(), q_todo.qsize(), bad) + showstats(q_todo, q_pull, q_diet, good, bad, pws, dws) + continue + + for dw in dws: + if dw and not dw.is_alive(): + dws.remove(dw) + logger.info(' spa: terminated') + showstats(q_todo, q_pull, q_diet, good, bad, pws, dws) continue + if not q_diet.empty() and not len(dws): + dw = mp.Process(target=spa_worker, args=(config, q_diet)) + dw.start() + dws.append(dw) + logger.info(' spa: started') + # Any new results? try: gitdir, repoinfo, q_action, success = q_done.get_nowait() @@ -979,8 +1052,7 @@ def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce= else: bad += 1 logger.info(' done: %s', gitdir) - logger.info(' ---: %s done, %s active, %s queued, %s waiting, %s failed', - good, len(pws), q_pull.qsize(), q_todo.qsize(), bad) + showstats(q_todo, q_pull, q_diet, good, bad, pws, dws) if len(done) >= 100: # Write manifest every 100 repos update_manifest(config, done) @@ -1029,14 +1101,14 @@ def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce= loopmark = gitdir elif loopmark == gitdir: # sleep for a bit if we recognize that we've looped around all waiting repos - time.sleep(0.1) + time.sleep(0.5) continue if gitdir == loopmark: loopmark = None if len(pws) < pull_threads: - pw = mp.Process(target=pull_worker, args=(config, q_pull, q_done)) + pw = mp.Process(target=pull_worker, args=(config, q_pull, q_diet, q_done)) pw.start() pws.append(pw) logger.info(' worker: started (%s running)', len(pws)) |