diff options
author | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2020-08-29 20:01:59 -0400 |
---|---|---|
committer | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2020-08-29 20:12:38 -0400 |
commit | 7c6fb23d8e70e0b06883373b92f66b580cfff745 (patch) | |
tree | 239a75fc125d834fc2f974650c0a739979f1d601 | |
parent | 7cbe77af6be0f8726cc273ed39251ccc1a4dab8f (diff) | |
download | grokmirror-7c6fb23d8e70e0b06883373b92f66b580cfff745.tar.gz |
Move pull worker spawning up in the cycle
Avoid potential problems when q_todo is drained.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rwxr-xr-x | grokmirror/pull.py | 71 |
1 files changed, 37 insertions, 34 deletions
diff --git a/grokmirror/pull.py b/grokmirror/pull.py index d3b4534..940b130 100755 --- a/grokmirror/pull.py +++ b/grokmirror/pull.py @@ -615,19 +615,8 @@ def write_projects_list(config, manifest): def fill_todo_from_manifest(config, q_mani, nomtime=False, forcepurge=False): # l_ = local, r_ = remote l_mani_path = config['core'].get('manifest') - r_mani_status_path = os.path.join(os.path.dirname(l_mani_path), '.%s.remote' % os.path.basename(l_mani_path)) - try: - with open(r_mani_status_path, 'r') as fh: - r_mani_status = json.loads(fh.read()) - except (IOError, json.JSONDecodeError): - logger.debug('Could not read %s', r_mani_status_path) - r_mani_status = dict() - r_last_fetched = r_mani_status.get('last-fetched', 0) - config_last_modified = r_mani_status.get('config-last-modified', 0) - if config_last_modified != config.last_modified: - nomtime = True - r_mani_cmd = config['remote'].get('manifest_command') + if r_mani_cmd: if not os.access(r_mani_cmd, os.X_OK): logger.critical('Remote manifest command is not executable: %s', r_mani_cmd) @@ -658,6 +647,17 @@ def fill_todo_from_manifest(config, q_mani, nomtime=False, forcepurge=False): raise IOError('Empty manifest returned by %s' % r_mani_cmd) else: + r_mani_status_path = os.path.join(os.path.dirname(l_mani_path), '.%s.remote' % os.path.basename(l_mani_path)) + try: + with open(r_mani_status_path, 'r') as fh: + r_mani_status = json.loads(fh.read()) + except (IOError, json.JSONDecodeError): + logger.debug('Could not read %s', r_mani_status_path) + r_mani_status = dict() + r_last_fetched = r_mani_status.get('last-fetched', 0) + config_last_modified = r_mani_status.get('config-last-modified', 0) + if config_last_modified != config.last_modified: + nomtime = True r_mani_url = config['remote'].get('manifest') logger.info(' manifest: fetching %s', r_mani_url) if r_mani_url.find('file:///') == 0: @@ -925,11 +925,6 @@ def fill_todo_from_manifest(config, q_mani, nomtime=False, forcepurge=False): else: logger.debug('No repositories need purging') - if q_mani.empty(): - logger.info(' manifest: no new updates') - else: - logger.info(' manifest: %s new updates', q_mani.qsize()) - def update_manifest(config, entries): manifile = config['core'].get('manifest') @@ -1091,6 +1086,13 @@ def pull_mirror(config, nomtime=False, forcepurge=False, runonce=False): dw.start() dws.append(dw) + if not q_pull.empty() and len(pws) < pull_threads: + pw = mp.Process(target=pull_worker, args=(config, q_pull, q_spa, q_done)) + pw.daemon = True + pw.start() + pws.append(pw) + logger.info(' worker: started (%s running)', len(pws)) + # Any new results? try: while True: @@ -1119,17 +1121,24 @@ def pull_mirror(config, nomtime=False, forcepurge=False, runonce=False): # Anything new in the manifest queue? try: - gitdir, repoinfo, action = q_mani.get_nowait() - if (gitdir, action) in actions: - logger.debug('already in the queue: %s, %s', gitdir, action) - continue - if action == 'pull' and (gitdir, 'init') in actions: - logger.debug('already in the queue as init: %s, %s', gitdir, action) - continue + new_updates = 0 + while True: + gitdir, repoinfo, action = q_mani.get_nowait() + if (gitdir, action) in actions: + logger.debug('already in the queue: %s, %s', gitdir, action) + continue + if action == 'pull' and (gitdir, 'init') in actions: + logger.debug('already in the queue as init: %s, %s', gitdir, action) + continue + + actions.add((gitdir, action)) + q_todo.put((gitdir, repoinfo, action)) + new_updates += 1 + logger.debug('queued: %s, %s', gitdir, action) + + if new_updates: + logger.info(' manifest: %s new updates', new_updates) - actions.add((gitdir, action)) - q_todo.put((gitdir, repoinfo, action)) - logger.debug('queued: %s, %s', gitdir, action) except queue.Empty: pass @@ -1146,6 +1155,7 @@ def pull_mirror(config, nomtime=False, forcepurge=False, runonce=False): # Finally, deal with q_todo try: gitdir, repoinfo, q_action = q_todo.get_nowait() + logger.debug('main_thread: got %s/%s from q_todo', gitdir, q_action) except queue.Empty: if q_mani.empty() and q_done.empty(): if not len(pws): @@ -1181,13 +1191,6 @@ def pull_mirror(config, nomtime=False, forcepurge=False, runonce=False): if gitdir == loopmark: loopmark = None - if len(pws) < pull_threads: - pw = mp.Process(target=pull_worker, args=(config, q_pull, q_spa, q_done)) - pw.daemon = True - pw.start() - pws.append(pw) - logger.info(' worker: started (%s running)', len(pws)) - if q_action == 'objstore_migrate': # Add forkgroup to busy, so we don't run any pulls until it's done busy.add(repoinfo['forkgroup']) |