227 lines
6.6 KiB
Python
227 lines
6.6 KiB
Python
|
|
from urllib.parse import quote
|
|||
|
|
import json
|
|||
|
|
import subprocess as sub
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
from typing import Iterator, Any, Literal, TypedDict, Optional
|
|||
|
|
from tempfile import NamedTemporaryFile
|
|||
|
|
|
|||
|
|
debug: bool = True if os.environ.get("DEBUG", False) else False
|
|||
|
|
Bin = str
|
|||
|
|
args: dict[str, Any] = json.loads(os.environ["ARGS"])
|
|||
|
|
bins: dict[str, Bin] = args["binaries"]
|
|||
|
|
|
|||
|
|
mode: str = sys.argv[1]
|
|||
|
|
jsonArg: dict = json.loads(sys.argv[2])
|
|||
|
|
|
|||
|
|
Args = Iterator[str]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def log(msg: str) -> None:
|
|||
|
|
print(msg, file=sys.stderr)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def atomically_write(file_path: str, content: bytes) -> None:
|
|||
|
|
"""atomically write the content into `file_path`"""
|
|||
|
|
with NamedTemporaryFile(
|
|||
|
|
# write to the parent dir, so that it’s guaranteed to be on the same filesystem
|
|||
|
|
dir=os.path.dirname(file_path),
|
|||
|
|
delete=False
|
|||
|
|
) as tmp:
|
|||
|
|
try:
|
|||
|
|
tmp.write(content)
|
|||
|
|
os.rename(
|
|||
|
|
src=tmp.name,
|
|||
|
|
dst=file_path
|
|||
|
|
)
|
|||
|
|
except Exception:
|
|||
|
|
os.unlink(tmp.name)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def curl_github_args(token: str | None, url: str) -> Args:
|
|||
|
|
"""Query the github API via curl"""
|
|||
|
|
yield bins["curl"]
|
|||
|
|
if not debug:
|
|||
|
|
yield "--silent"
|
|||
|
|
# follow redirects
|
|||
|
|
yield "--location"
|
|||
|
|
if token:
|
|||
|
|
yield "-H"
|
|||
|
|
yield f"Authorization: token {token}"
|
|||
|
|
yield url
|
|||
|
|
|
|||
|
|
|
|||
|
|
def curl_result(output: bytes) -> Any | Literal["not found"]:
|
|||
|
|
"""Parse the curl result of the github API"""
|
|||
|
|
res: Any = json.loads(output)
|
|||
|
|
match res:
|
|||
|
|
case dict(res):
|
|||
|
|
message: str = res.get("message", "")
|
|||
|
|
if "rate limit" in message:
|
|||
|
|
sys.exit("Rate limited by the Github API")
|
|||
|
|
if "Not Found" in message:
|
|||
|
|
return "not found"
|
|||
|
|
# if the result is another type, we can pass it on
|
|||
|
|
return res
|
|||
|
|
|
|||
|
|
|
|||
|
|
def nix_prefetch_git_args(url: str, version_rev: str) -> Args:
|
|||
|
|
"""Prefetch a git repository"""
|
|||
|
|
yield bins["nix-prefetch-git"]
|
|||
|
|
if not debug:
|
|||
|
|
yield "--quiet"
|
|||
|
|
yield "--no-deepClone"
|
|||
|
|
yield "--url"
|
|||
|
|
yield url
|
|||
|
|
yield "--rev"
|
|||
|
|
yield version_rev
|
|||
|
|
|
|||
|
|
|
|||
|
|
def run_cmd(args: Args) -> bytes:
|
|||
|
|
all = list(args)
|
|||
|
|
if debug:
|
|||
|
|
log(str(all))
|
|||
|
|
return sub.check_output(all)
|
|||
|
|
|
|||
|
|
|
|||
|
|
Dir = str
|
|||
|
|
|
|||
|
|
|
|||
|
|
def fetchRepo() -> None:
|
|||
|
|
"""fetch the given repo and write its nix-prefetch output to the corresponding grammar json file"""
|
|||
|
|
match jsonArg:
|
|||
|
|
case {
|
|||
|
|
"orga": orga,
|
|||
|
|
"repo": repo,
|
|||
|
|
"outputDir": outputDir,
|
|||
|
|
"nixRepoAttrName": nixRepoAttrName,
|
|||
|
|
}:
|
|||
|
|
if repo in args["pinnedGrammars"]:
|
|||
|
|
log(f"Grammar {repo} is pinned, skipping upgrade...")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
token: str | None = os.environ.get("GITHUB_TOKEN", None)
|
|||
|
|
out = run_cmd(
|
|||
|
|
curl_github_args(
|
|||
|
|
token,
|
|||
|
|
url=f"https://api.github.com/repos/{quote(orga)}/{quote(repo)}/releases/latest"
|
|||
|
|
)
|
|||
|
|
)
|
|||
|
|
release: str
|
|||
|
|
match curl_result(out):
|
|||
|
|
case "not found":
|
|||
|
|
if "branch" in jsonArg:
|
|||
|
|
branch = jsonArg.get("branch")
|
|||
|
|
release = f"refs/heads/{branch}"
|
|||
|
|
else:
|
|||
|
|
# github sometimes returns an empty list even tough there are releases
|
|||
|
|
log(f"uh-oh, latest for {orga}/{repo} is not there, using HEAD")
|
|||
|
|
release = "HEAD"
|
|||
|
|
case {"tag_name": tag_name}:
|
|||
|
|
release = tag_name
|
|||
|
|
case _:
|
|||
|
|
sys.exit(f"git result for {orga}/{repo} did not have a `tag_name` field")
|
|||
|
|
|
|||
|
|
log(f"Fetching latest release ({release}) of {orga}/{repo} …")
|
|||
|
|
res = run_cmd(
|
|||
|
|
nix_prefetch_git_args(
|
|||
|
|
url=f"https://github.com/{quote(orga)}/{quote(repo)}",
|
|||
|
|
version_rev=release
|
|||
|
|
)
|
|||
|
|
)
|
|||
|
|
atomically_write(
|
|||
|
|
file_path=os.path.join(
|
|||
|
|
outputDir,
|
|||
|
|
f"{nixRepoAttrName}.json"
|
|||
|
|
),
|
|||
|
|
content=res
|
|||
|
|
)
|
|||
|
|
case _:
|
|||
|
|
sys.exit("input json must have `orga` and `repo` keys")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def fetchOrgaLatestRepos(orga: str) -> set[str]:
|
|||
|
|
"""fetch the latest (100) repos from the given github organization"""
|
|||
|
|
token: str | None = os.environ.get("GITHUB_TOKEN", None)
|
|||
|
|
out = run_cmd(
|
|||
|
|
curl_github_args(
|
|||
|
|
token,
|
|||
|
|
url=f"https://api.github.com/orgs/{quote(orga)}/repos?per_page=100"
|
|||
|
|
)
|
|||
|
|
)
|
|||
|
|
match curl_result(out):
|
|||
|
|
case "not found":
|
|||
|
|
sys.exit(f"github organization {orga} not found")
|
|||
|
|
case list(repos):
|
|||
|
|
res: list[str] = []
|
|||
|
|
for repo in repos:
|
|||
|
|
name = repo.get("name")
|
|||
|
|
if name:
|
|||
|
|
res.append(name)
|
|||
|
|
return set(res)
|
|||
|
|
case _:
|
|||
|
|
sys.exit("github result was not a list of repos, but {other}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def checkTreeSitterRepos(latest_github_repos: set[str]) -> None:
|
|||
|
|
"""Make sure we know about all tree sitter repos on the tree sitter orga."""
|
|||
|
|
known: set[str] = set(args["knownTreeSitterOrgGrammarRepos"])
|
|||
|
|
ignored: set[str] = set(args["ignoredTreeSitterOrgRepos"])
|
|||
|
|
|
|||
|
|
unknown = latest_github_repos - (known | ignored)
|
|||
|
|
|
|||
|
|
if unknown:
|
|||
|
|
sys.exit(f"These repositories are neither known nor ignored:\n{unknown}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
Grammar = TypedDict(
|
|||
|
|
"Grammar",
|
|||
|
|
{
|
|||
|
|
"nixRepoAttrName": str,
|
|||
|
|
"orga": str,
|
|||
|
|
"repo": str,
|
|||
|
|
"branch": Optional[str]
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def printAllGrammarsNixFile() -> None:
|
|||
|
|
"""Print a .nix file that imports all grammars."""
|
|||
|
|
allGrammars: list[dict[str, Grammar]] = jsonArg["allGrammars"]
|
|||
|
|
outputDir: Dir = jsonArg["outputDir"]
|
|||
|
|
|
|||
|
|
def file() -> Iterator[str]:
|
|||
|
|
yield "{ lib }:"
|
|||
|
|
yield "{"
|
|||
|
|
for grammar in allGrammars:
|
|||
|
|
n = grammar["nixRepoAttrName"]
|
|||
|
|
yield f" {n} = lib.importJSON ./{n}.json;"
|
|||
|
|
yield "}"
|
|||
|
|
yield ""
|
|||
|
|
|
|||
|
|
atomically_write(
|
|||
|
|
file_path=os.path.join(
|
|||
|
|
outputDir,
|
|||
|
|
"default.nix"
|
|||
|
|
),
|
|||
|
|
content="\n".join(file()).encode()
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def fetchAndCheckTreeSitterRepos() -> None:
|
|||
|
|
log("fetching list of grammars")
|
|||
|
|
latest_repos = fetchOrgaLatestRepos(orga="tree-sitter")
|
|||
|
|
log("checking the tree-sitter repo list against the grammars we know")
|
|||
|
|
checkTreeSitterRepos(latest_repos)
|
|||
|
|
|
|||
|
|
|
|||
|
|
match mode:
|
|||
|
|
case "fetch-repo":
|
|||
|
|
fetchRepo()
|
|||
|
|
case "fetch-and-check-tree-sitter-repos":
|
|||
|
|
fetchAndCheckTreeSitterRepos()
|
|||
|
|
case "print-all-grammars-nix-file":
|
|||
|
|
printAllGrammarsNixFile()
|
|||
|
|
case _:
|
|||
|
|
sys.exit(f"mode {mode} unknown")
|