aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-07-15 22:52:41 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-07-15 22:56:57 -0400
commitab9ff0af851e926ebdf4fedda101539b5171db1d (patch)
treea9c451497c3b9bb5f7ccf3c3c7b758566cb93241
parentd375361fa6f6cf7e83130d17bc2f5d32f740bb66 (diff)
downloadgrokmirror-ab9ff0af851e926ebdf4fedda101539b5171db1d.tar.gz
Move objstore and repack ops into single thread
I noticed that we thrash IO a lot when doing objstore and repack operations for each repo that we fetch inside pull threads. This introduces a separate "spa" process where these operations are done lazily, allowing pull threads to move on to next repos. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r--grokmirror/__init__.py2
-rwxr-xr-xgrokmirror/pull.py170
2 files changed, 122 insertions, 50 deletions
diff --git a/grokmirror/__init__.py b/grokmirror/__init__.py
index a374ab8..7cc2d48 100644
--- a/grokmirror/__init__.py
+++ b/grokmirror/__init__.py
@@ -211,7 +211,7 @@ def set_repo_timestamp(toplevel, gitdir, ts):
def get_repo_obj_info(fullpath):
args = ['count-objects', '-v']
retcode, output, error = run_git_command(fullpath, args)
- obj_info = {}
+ obj_info = dict()
if output:
for line in output.split('\n'):
diff --git a/grokmirror/pull.py b/grokmirror/pull.py
index 4439759..62fea85 100755
--- a/grokmirror/pull.py
+++ b/grokmirror/pull.py
@@ -44,9 +44,10 @@ logger = logging.getLogger(__name__)
class SignalHandler:
- def __init__(self, config, sw, pws, done):
+ def __init__(self, config, sw, dws, pws, done):
self.config = config
self.sw = sw
+ self.dws = dws
self.pws = pws
self.done = done
self.killed = False
@@ -54,11 +55,17 @@ class SignalHandler:
def _handler(self, signum, frame):
self.killed = True
logger.debug('Received signum=%s, frame=%s', signum, frame)
- self.sw.terminate()
- self.sw.join()
+ if self.sw:
+ self.sw.terminate()
+ self.sw.join()
+
+ for dw in self.dws:
+ if dw and dw.is_alive():
+ dw.terminate()
+ dw.join()
for pw in self.pws:
- if pw:
+ if pw and pw.is_alive():
pw.terminate()
pw.join()
@@ -164,7 +171,56 @@ def build_optimal_forkgroups(l_manifest, r_manifest, toplevel, obstdir):
return forkgroups
-def pull_worker(config, q_pull, q_done):
+def spa_worker(config, q_diet):
+ toplevel = os.path.realpath(config['core'].get('toplevel'))
+ while True:
+ try:
+ (gitdir, actions) = q_diet.get(timeout=1)
+ except queue.Empty:
+ return
+
+ logger.debug('spa_worker: gitdir=%s, actions=%s', gitdir, actions)
+ fullpath = os.path.join(toplevel, gitdir.lstrip('/'))
+ try:
+ grokmirror.lock_repo(fullpath, nonblocking=True)
+ except IOError:
+ # We'll get it during grok-fsck
+ continue
+
+ logger.info(' in-spa: %s', gitdir)
+ done = list()
+ for action in actions:
+ if action in done:
+ continue
+ done.append(action)
+ if action == 'objstore':
+ altrepo = grokmirror.get_altrepo(fullpath)
+ grokmirror.fetch_objstore_repo(altrepo, fullpath)
+
+ elif action == 'repack':
+ logger.debug('quick-repacking %s', fullpath)
+ args = ['repack', '-Adlq']
+ ecode, out, err = grokmirror.run_git_command(fullpath, args)
+ if ecode > 0:
+ logger.debug('Could not repack %s', fullpath)
+
+ elif action == 'packrefs':
+ args = ['pack-refs']
+ ecode, out, err = grokmirror.run_git_command(fullpath, args)
+ if ecode > 0:
+ logger.debug('Could not pack-refs %s', fullpath)
+
+ elif action == 'packrefs-all':
+ args = ['pack-refs', '--all']
+ ecode, out, err = grokmirror.run_git_command(fullpath, args)
+ if ecode > 0:
+ logger.debug('Could not pack-refs %s', fullpath)
+
+ grokmirror.unlock_repo(fullpath)
+ logger.info(' dieted: %s (%s)', gitdir, ', '.join(done))
+
+
+def pull_worker(config, q_pull, q_diet, q_done):
toplevel = os.path.realpath(config['core'].get('toplevel'))
obstdir = os.path.realpath(config['core'].get('objstore'))
maxretries = config['pull'].getint('retries', 3)
@@ -176,10 +232,10 @@ def pull_worker(config, q_pull, q_done):
except queue.Empty:
return
- logger.debug('gitdir=%s, action=%s', gitdir, action)
+ logger.debug('pull_worker: gitdir=%s, action=%s', gitdir, action)
fullpath = os.path.join(toplevel, gitdir.lstrip('/'))
success = True
- repack = False
+ diet_actions = list()
try:
grokmirror.lock_repo(fullpath, nonblocking=True)
@@ -251,24 +307,29 @@ def pull_worker(config, q_pull, q_done):
repoinfo['fingerprint'] = post_pull_fp
if post_pull_fp != my_fp:
grokmirror.set_repo_fingerprint(toplevel, gitdir, fingerprint=post_pull_fp)
- action = 'objstore'
+ altrepo = grokmirror.get_altrepo(fullpath)
+ if altrepo and grokmirror.is_obstrepo(altrepo, obstdir) and not repoinfo.get('private'):
+ # do we have any objects in the objstore repo?
+ o_obj_info = grokmirror.get_repo_obj_info(altrepo)
+ if o_obj_info.get('count') == '0':
+ # We fetch right now, as other repos may be waiting on these objects
+ logger.info(' objstore: %s', gitdir)
+ grokmirror.fetch_objstore_repo(altrepo, fullpath)
+ else:
+ # We lazy-fetch in the spa
+ diet_actions.append('objstore')
+ diet_actions.append('repack')
if my_fp is None:
# This was the initial clone, so pack all refs
- logger.info(' packrefs: %s', gitdir)
- args = ['pack-refs', '--all']
- ecode, out, err = grokmirror.run_git_command(fullpath, args)
- if ecode > 0:
- # We don't really care if it fails, though it really shouldn't
- logger.debug('Could not pack-refs in %s', fullpath)
- # Run a bigger repack if objstore is involved
- repack = True
+ diet_actions.append('packrefs-all')
else:
+ diet_actions.append('packrefs')
if not grokmirror.is_precious(fullpath):
# See if doing a quick repack would be beneficial
obj_info = grokmirror.get_repo_obj_info(fullpath)
if grokmirror.get_repack_level(obj_info):
# We only do quick repacks, so we don't care about precise level
- repack = True
+ diet_actions.append('repack')
modified = repoinfo.get('modified')
if modified is not None:
@@ -277,30 +338,8 @@ def pull_worker(config, q_pull, q_done):
logger.debug('FP match, not pulling %s', gitdir)
if action == 'objstore_migrate':
- repack = True
- action = 'objstore'
-
- if action == 'objstore':
- altrepo = grokmirror.get_altrepo(fullpath)
- if altrepo and grokmirror.is_obstrepo(altrepo, obstdir) and not repoinfo.get('private'):
- logger.info(' objstore: %s', gitdir)
- grokmirror.fetch_objstore_repo(altrepo, fullpath)
- if repack:
- action = 'repack'
-
- if action == 'repack':
- logger.debug('quick-repacking %s', fullpath)
- args = ['repack', '-Adlq']
- logger.info(' repack: %s', gitdir)
- ecode, out, err = grokmirror.run_git_command(fullpath, args)
- if ecode > 0:
- logger.debug('Could not repack %s', fullpath)
- else:
- logger.info(' packrefs: %s', gitdir)
- args = ['pack-refs']
- ecode, out, err = grokmirror.run_git_command(fullpath, args)
- if ecode > 0:
- logger.debug('Could not pack-refs %s', fullpath)
+ diet_actions.append('objstore')
+ diet_actions.append('repack')
symlinks = repoinfo.get('symlinks')
if os.path.exists(fullpath) and symlinks:
@@ -327,6 +366,8 @@ def pull_worker(config, q_pull, q_done):
grokmirror.unlock_repo(fullpath)
q_done.put((gitdir, repoinfo, q_action, success))
+ if diet_actions:
+ q_diet.put((gitdir, diet_actions))
def cull_manifest(manifest, config):
@@ -875,6 +916,24 @@ def socket_worker(config, q_todo, sockfile):
server.serve_forever()
+def showstats(q_todo, q_pull, q_diet, good, bad, pws, dws):
+ stats = list()
+ if good:
+ stats.append('%s done' % good)
+ if pws:
+ stats.append('%s active' % len(pws))
+ if not q_pull.empty():
+ stats.append('%s queued' % q_pull.qsize())
+ if not q_todo.empty():
+ stats.append('%s waiting' % q_todo.qsize())
+ if not q_diet.empty():
+ stats.append('%s dieting' % (q_diet.qsize() + len(dws)))
+ if bad:
+ stats.append('%s failed' % bad)
+
+ logger.info(' ---: %s', ', '.join(stats))
+
+
def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce=False):
global logger
@@ -915,6 +974,7 @@ def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce=
q_todo = mp.Queue()
q_pull = mp.Queue()
q_done = mp.Queue()
+ q_diet = mp.Queue()
sw = None
sockfile = config['pull'].get('socket')
@@ -930,6 +990,7 @@ def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce=
sw.start()
pws = list()
+ dws = list()
actions = set()
new_actions = fill_todo_from_manifest(config, actions, nomtime=nomtime, forcepurge=forcepurge)
if not len(new_actions) and runonce:
@@ -952,16 +1013,28 @@ def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce=
loopmark = None
saidsleeping = False
lastrun = time.time()
- with SignalHandler(config, sw, pws, done):
+ with SignalHandler(config, sw, dws, pws, done):
while True:
for pw in pws:
- if not pw.is_alive():
+ if pw and not pw.is_alive():
pws.remove(pw)
logger.info(' worker: terminated (%s remaining)', len(pws))
- logger.info(' ---: %s done, %s active, %s queued, %s waiting, %s failed',
- good, len(pws), q_pull.qsize(), q_todo.qsize(), bad)
+ showstats(q_todo, q_pull, q_diet, good, bad, pws, dws)
+ continue
+
+ for dw in dws:
+ if dw and not dw.is_alive():
+ dws.remove(dw)
+ logger.info(' spa: terminated')
+ showstats(q_todo, q_pull, q_diet, good, bad, pws, dws)
continue
+ if not q_diet.empty() and not len(dws):
+ dw = mp.Process(target=spa_worker, args=(config, q_diet))
+ dw.start()
+ dws.append(dw)
+ logger.info(' spa: started')
+
# Any new results?
try:
gitdir, repoinfo, q_action, success = q_done.get_nowait()
@@ -979,8 +1052,7 @@ def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce=
else:
bad += 1
logger.info(' done: %s', gitdir)
- logger.info(' ---: %s done, %s active, %s queued, %s waiting, %s failed',
- good, len(pws), q_pull.qsize(), q_todo.qsize(), bad)
+ showstats(q_todo, q_pull, q_diet, good, bad, pws, dws)
if len(done) >= 100:
# Write manifest every 100 repos
update_manifest(config, done)
@@ -1029,14 +1101,14 @@ def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce=
loopmark = gitdir
elif loopmark == gitdir:
# sleep for a bit if we recognize that we've looped around all waiting repos
- time.sleep(0.1)
+ time.sleep(0.5)
continue
if gitdir == loopmark:
loopmark = None
if len(pws) < pull_threads:
- pw = mp.Process(target=pull_worker, args=(config, q_pull, q_done))
+ pw = mp.Process(target=pull_worker, args=(config, q_pull, q_diet, q_done))
pw.start()
pws.append(pw)
logger.info(' worker: started (%s running)', len(pws))