Files
makeanyplace-devops/scripts/sync/sync_repos_from_remember.py
T

97 lines
3.4 KiB
Python
Raw Normal View History

import os
import subprocess
import json
import urllib.request
import urllib.error
import base64
import shutil
REMEMBER_URL = "https://remember.thefoldwithin.earth/api/v1"
FORGEJO_LOCAL_URL = "http://forgejo.local/api/v1"
AUTH = ("mrhavens", "Aok4y2k!")
BACKUP_DIR = "/home/antigravity/scratch/remember_backups"
os.makedirs(BACKUP_DIR, exist_ok=True)
def _auth_headers():
auth_str = f"{AUTH[0]}:{AUTH[1]}"
b64_auth = base64.b64encode(auth_str.encode('ascii')).decode('ascii')
return {
"Authorization": f"Basic {b64_auth}",
"Content-Type": "application/json",
"User-Agent": "curl/7.81.0"
}
def get_all_repos(api_url):
repos = []
page = 1
while True:
req = urllib.request.Request(f"{api_url}/user/repos?limit=50&page={page}", headers=_auth_headers())
try:
with urllib.request.urlopen(req) as resp:
if resp.status != 200:
print(f"Error fetching repos: {resp.status}")
break
data = json.loads(resp.read().decode('utf-8'))
if not data:
break
repos.extend(data)
page += 1
except Exception as e:
print(f"Failed to fetch {api_url}: {e}")
break
return repos
print("Fetching repos from remember...")
remember_repos = get_all_repos(REMEMBER_URL)
print(f"Found {len(remember_repos)} repos on remember.thefoldwithin.earth")
print("Fetching repos from forgejo.local...")
local_repos = get_all_repos(FORGEJO_LOCAL_URL)
local_repo_names = {r['name'] for r in local_repos}
print(f"Found {len(local_repos)} repos on forgejo.local")
for repo in remember_repos:
name = repo['name']
clone_url = repo['clone_url'].replace("https://", f"https://{AUTH[0]}:{AUTH[1]}@")
print(f"\n--- Processing {name} ---")
# 1. Create on forgejo.local if it doesn't exist
if name not in local_repo_names:
print(f"Creating {name} on forgejo.local...")
req = urllib.request.Request(f"{FORGEJO_LOCAL_URL}/user/repos", data=json.dumps({"name": name, "private": repo['private']}).encode('utf-8'), headers=_auth_headers(), method="POST")
try:
urllib.request.urlopen(req)
except urllib.error.HTTPError as e:
if e.code != 409:
print(f"Failed to create repo {name}: {e.code} {e.reason}")
continue
else:
print(f"{name} already exists on forgejo.local.")
# 2. Clone and Push
repo_dir = os.path.join(BACKUP_DIR, name)
if os.path.exists(repo_dir):
shutil.rmtree(repo_dir)
print(f"Cloning {name} from remember...")
try:
subprocess.run(["git", "clone", "--mirror", clone_url, repo_dir], check=True, capture_output=True)
print(f"Pushing {name} to forgejo.local...")
push_url = f"http://{AUTH[0]}:{AUTH[1]}@forgejo.local/mrhavens/{name}.git"
subprocess.run(["git", "push", "--mirror", push_url], cwd=repo_dir, check=True, capture_output=True)
print(f"Successfully mirrored {name}!")
except subprocess.CalledProcessError as e:
print(f"Error processing {name}: {e}")
if e.stderr:
print(e.stderr.decode('utf-8', errors='ignore'))
# Clean up to save space
if os.path.exists(repo_dir):
shutil.rmtree(repo_dir)
print("\nFull synchronization complete. All public repositories are now secured on the physical node.")