aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-08-29 20:01:59 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-08-29 20:12:38 -0400
commit7c6fb23d8e70e0b06883373b92f66b580cfff745 (patch)
tree239a75fc125d834fc2f974650c0a739979f1d601
parent7cbe77af6be0f8726cc273ed39251ccc1a4dab8f (diff)
downloadgrokmirror-7c6fb23d8e70e0b06883373b92f66b580cfff745.tar.gz
Move pull worker spawning up in the cycle
Avoid potential problems when q_todo is drained. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rwxr-xr-xgrokmirror/pull.py71
1 files changed, 37 insertions, 34 deletions
diff --git a/grokmirror/pull.py b/grokmirror/pull.py
index d3b4534..940b130 100755
--- a/grokmirror/pull.py
+++ b/grokmirror/pull.py
@@ -615,19 +615,8 @@ def write_projects_list(config, manifest):
def fill_todo_from_manifest(config, q_mani, nomtime=False, forcepurge=False):
# l_ = local, r_ = remote
l_mani_path = config['core'].get('manifest')
- r_mani_status_path = os.path.join(os.path.dirname(l_mani_path), '.%s.remote' % os.path.basename(l_mani_path))
- try:
- with open(r_mani_status_path, 'r') as fh:
- r_mani_status = json.loads(fh.read())
- except (IOError, json.JSONDecodeError):
- logger.debug('Could not read %s', r_mani_status_path)
- r_mani_status = dict()
- r_last_fetched = r_mani_status.get('last-fetched', 0)
- config_last_modified = r_mani_status.get('config-last-modified', 0)
- if config_last_modified != config.last_modified:
- nomtime = True
-
r_mani_cmd = config['remote'].get('manifest_command')
+
if r_mani_cmd:
if not os.access(r_mani_cmd, os.X_OK):
logger.critical('Remote manifest command is not executable: %s', r_mani_cmd)
@@ -658,6 +647,17 @@ def fill_todo_from_manifest(config, q_mani, nomtime=False, forcepurge=False):
raise IOError('Empty manifest returned by %s' % r_mani_cmd)
else:
+ r_mani_status_path = os.path.join(os.path.dirname(l_mani_path), '.%s.remote' % os.path.basename(l_mani_path))
+ try:
+ with open(r_mani_status_path, 'r') as fh:
+ r_mani_status = json.loads(fh.read())
+ except (IOError, json.JSONDecodeError):
+ logger.debug('Could not read %s', r_mani_status_path)
+ r_mani_status = dict()
+ r_last_fetched = r_mani_status.get('last-fetched', 0)
+ config_last_modified = r_mani_status.get('config-last-modified', 0)
+ if config_last_modified != config.last_modified:
+ nomtime = True
r_mani_url = config['remote'].get('manifest')
logger.info(' manifest: fetching %s', r_mani_url)
if r_mani_url.find('file:///') == 0:
@@ -925,11 +925,6 @@ def fill_todo_from_manifest(config, q_mani, nomtime=False, forcepurge=False):
else:
logger.debug('No repositories need purging')
- if q_mani.empty():
- logger.info(' manifest: no new updates')
- else:
- logger.info(' manifest: %s new updates', q_mani.qsize())
-
def update_manifest(config, entries):
manifile = config['core'].get('manifest')
@@ -1091,6 +1086,13 @@ def pull_mirror(config, nomtime=False, forcepurge=False, runonce=False):
dw.start()
dws.append(dw)
+ if not q_pull.empty() and len(pws) < pull_threads:
+ pw = mp.Process(target=pull_worker, args=(config, q_pull, q_spa, q_done))
+ pw.daemon = True
+ pw.start()
+ pws.append(pw)
+ logger.info(' worker: started (%s running)', len(pws))
+
# Any new results?
try:
while True:
@@ -1119,17 +1121,24 @@ def pull_mirror(config, nomtime=False, forcepurge=False, runonce=False):
# Anything new in the manifest queue?
try:
- gitdir, repoinfo, action = q_mani.get_nowait()
- if (gitdir, action) in actions:
- logger.debug('already in the queue: %s, %s', gitdir, action)
- continue
- if action == 'pull' and (gitdir, 'init') in actions:
- logger.debug('already in the queue as init: %s, %s', gitdir, action)
- continue
+ new_updates = 0
+ while True:
+ gitdir, repoinfo, action = q_mani.get_nowait()
+ if (gitdir, action) in actions:
+ logger.debug('already in the queue: %s, %s', gitdir, action)
+ continue
+ if action == 'pull' and (gitdir, 'init') in actions:
+ logger.debug('already in the queue as init: %s, %s', gitdir, action)
+ continue
+
+ actions.add((gitdir, action))
+ q_todo.put((gitdir, repoinfo, action))
+ new_updates += 1
+ logger.debug('queued: %s, %s', gitdir, action)
+
+ if new_updates:
+ logger.info(' manifest: %s new updates', new_updates)
- actions.add((gitdir, action))
- q_todo.put((gitdir, repoinfo, action))
- logger.debug('queued: %s, %s', gitdir, action)
except queue.Empty:
pass
@@ -1146,6 +1155,7 @@ def pull_mirror(config, nomtime=False, forcepurge=False, runonce=False):
# Finally, deal with q_todo
try:
gitdir, repoinfo, q_action = q_todo.get_nowait()
+ logger.debug('main_thread: got %s/%s from q_todo', gitdir, q_action)
except queue.Empty:
if q_mani.empty() and q_done.empty():
if not len(pws):
@@ -1181,13 +1191,6 @@ def pull_mirror(config, nomtime=False, forcepurge=False, runonce=False):
if gitdir == loopmark:
loopmark = None
- if len(pws) < pull_threads:
- pw = mp.Process(target=pull_worker, args=(config, q_pull, q_spa, q_done))
- pw.daemon = True
- pw.start()
- pws.append(pw)
- logger.info(' worker: started (%s running)', len(pws))
-
if q_action == 'objstore_migrate':
# Add forkgroup to busy, so we don't run any pulls until it's done
busy.add(repoinfo['forkgroup'])