Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chunked local evaluation #429

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ let
withNom
&& (builtins.tryEval (builtins.elem buildPlatform.system pkgs.ghc.meta.platforms)).value or false;
in
python3.pkgs.buildPythonApplication {
python3Packages.buildPythonApplication {
name = "nixpkgs-review";
src = ./.;
format = "pyproject";
nativeBuildInputs = [ installShellFiles ] ++ lib.optional withAutocomplete python3.pkgs.argcomplete;
propagatedBuildInputs = [ python3.pkgs.argcomplete ];
nativeBuildInputs = [ installShellFiles ] ++ lib.optional withAutocomplete python3Packages.argcomplete;
propagatedBuildInputs = [ python3Packages.argcomplete ];

nativeCheckInputs = [
python3.pkgs.setuptools
python3.pkgs.pylint
python3Packages.setuptools
python3Packages.pylint
glibcLocales

# needed for interactive unittests
python3.pkgs.pytest
python3Packages.pytest
pkgs.nixVersions.stable or nix_2_4
git
] ++ lib.optional withSandboxSupport bubblewrap ++ lib.optional withNom' nix-output-monitor;
Expand Down
62 changes: 62 additions & 0 deletions nixpkgs_review/nix/parallel-eval.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Invocation:
Invocation; note that the number of processes spawned is four times
the number of cores -- this helps in two ways:
1. Keeping cores busy while I/O operations are in flight
2. Since the amount of time needed for the jobs is *not* balanced
this minimizes the "tail latency" for the very last job to finish
(on one core) by making the job size smaller.
*/
# see pkgs/top-level/nohydra
{
checkMeta,
includeBroken ? true,
path,
systems,
localSystem,
myChunk,
numChunks,
attrPathFile,
}: let
pkgs = import <nixpkgs> {
system = localSystem;
};
inherit (pkgs) lib;

attrPaths = builtins.fromJSON (builtins.readFile attrPathFile);
chunkSize = (lib.length attrPaths) / numChunks;
myPaths = let
dropped = lib.drop (chunkSize * myChunk) attrPaths;
in
if myChunk == numChunks - 1
then dropped
else lib.take chunkSize dropped;

unfiltered = import (path + "/pkgs/top-level/release-outpaths.nix") {
inherit
checkMeta
path
includeBroken
systems
;
};

f = i: m: a:
lib.mapAttrs (
name: values:
if a ? ${name}
then
if lib.any (value: lib.length value <= i + 1) values
then a.${name}
else f (i + 1) values a.${name}
else null
) (lib.groupBy (a: lib.elemAt a i) m);

filtered = f 0 myPaths unfiltered;

recurseEverywhere = val:
if lib.isDerivation val || !(lib.isAttrs val)
then val
else (builtins.mapAttrs (_: v: recurseEverywhere v) val) // {recurseForDerivations = true;};
in
recurseEverywhere filtered
158 changes: 109 additions & 49 deletions nixpkgs_review/review.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .github import GithubClient
from .nix import Attr, nix_build, nix_eval, nix_shell
from .report import Report
from .utils import System, current_system, info, sh, system_order_key, warn
from .utils import ROOT, System, current_system, info, sh, system_order_key, warn

# keep up to date with `supportedPlatforms`
# https://github.com/NixOS/ofborg/blob/cf2c6712bd7342406e799110e7cd465aa250cdca/ofborg/src/outpaths.nix#L12
Expand Down Expand Up @@ -201,8 +201,10 @@ def build_commit(
# TODO: nix-eval-jobs ?
base_packages: dict[System, list[Package]] = list_packages(
self.builddir.nix_path,
self.systems,
self.allow,
systems=self.systems,
local_system=self.local_system,
allow=self.allow,
nixpkgs_path=str(self.builddir.worktree_dir),
n_threads=self.num_parallel_evals,
)

Expand All @@ -216,8 +218,10 @@ def build_commit(
# TODO: nix-eval-jobs ?
merged_packages: dict[System, list[Package]] = list_packages(
self.builddir.nix_path,
self.systems,
self.allow,
systems=self.systems,
local_system=self.local_system,
allow=self.allow,
nixpkgs_path=str(self.builddir.worktree_dir),
n_threads=self.num_parallel_evals,
check_meta=True,
)
Expand Down Expand Up @@ -427,68 +431,124 @@ def parse_packages_xml(stdout: IO[str]) -> list[Package]:


def _list_packages_system(
system: System,
nix_path: str,
chunk_id: int,
num_chunks: int,
systems: set[System],
local_system: System,
nixpkgs_path: str,
paths_json_filename: str,
paths_filename: str,
allow: AllowedFeatures,
check_meta: bool = False,
) -> list[Package]:
cmd = [
) -> list[str]:
cmd: list[str] = [
"nix-env",
"--extra-experimental-features",
"" if allow.url_literals else "no-url-literals",
"--option",
"system",
system,
"-f",
"<nixpkgs>",
"--nix-path",
nix_path,
"-qaP",
"--xml",
"--no-name",
"--out-path",
"--show-trace",
"--allow-import-from-derivation"
if allow.ifd
else "--no-allow-import-from-derivation",
]
if check_meta:
cmd.append("--meta")
cmd.extend(["-f", str(ROOT.joinpath("nix/parallel-eval.nix"))])

cmd.extend(
["--arg", "systems", f"[{", ".join([f'"{system}"' for system in systems])}]"]
)
cmd.extend(["--arg", "checkMeta", "true"])
cmd.extend(["--arg", "includeBroken", "true"])
cmd.extend(["--argstr", "localSystem", local_system])
cmd.extend(["--arg", "attrPathFile", paths_json_filename])
cmd.extend(["--arg", "path", nixpkgs_path])
cmd.extend(["--arg", "numChunks", str(num_chunks)])
cmd.extend(["--arg", "myChunk", str(chunk_id)])

# cmd.extend([">", paths_filename])

info("$ " + " ".join(cmd))
with tempfile.NamedTemporaryFile(mode="w") as tmp:
res = subprocess.run(cmd, stdout=tmp)
if res.returncode != 0:
raise NixpkgsReviewError(
f"Failed to list packages: nix-env failed with exit code {res.returncode}"
)
tmp.flush()
with open(tmp.name, encoding="utf-8") as f:
return parse_packages_xml(f)
res = subprocess.run(
cmd,
check=True,
stdout=subprocess.PIPE,
text=True,
)
if res.returncode != 0:
raise NixpkgsReviewError(
f"Failed to list packages: nix-env failed with exit code {res.returncode}"
)
results: list[str] = []
for line in res.stdout.split("\n"):
# <package_path>.<system> (python312Packages.numpy.x86_64-linux)
results.append(line.split()[0].strip())

return results


def list_packages(
nix_path: str,
systems: set[System],
local_system: System,
allow: AllowedFeatures,
nixpkgs_path: str,
n_threads: int,
check_meta: bool = False,
) -> dict[System, list[Package]]:
results: dict[System, list[Package]] = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
future_to_system = {
executor.submit(
_list_packages_system,
system=system,
nix_path=nix_path,
allow=allow,
check_meta=check_meta,
): system
for system in systems
}
for future in concurrent.futures.as_completed(future_to_system):
system = future_to_system[future]
results[system] = future.result()
with tempfile.TemporaryDirectory() as temp_dir:
paths_json_filename: str = os.path.join(temp_dir, "paths.json")
with open(paths_json_filename, mode="w") as paths_json:
subprocess.run(
args=[
"nix-instantiate",
"--eval",
"--strict",
"--json",
"--arg",
"enableWarnings",
"false",
f"{nixpkgs_path}/pkgs/top-level/release-attrpaths-superset.nix",
"-A",
"paths",
],
stdout=paths_json,
stderr=subprocess.DEVNULL,
check=True,
)

return results
paths_filename: str = os.path.join(temp_dir, "paths")
num_chunks: int = 4 * n_threads
results: dict[System, list[Package]] = {system: [] for system in systems}
with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
futures = [
executor.submit(
_list_packages_system,
chunk_id=chunk_id,
systems=systems,
# nix_path=nix_path,
local_system=local_system,
nixpkgs_path=nixpkgs_path,
paths_json_filename=paths_json_filename,
paths_filename=paths_filename,
num_chunks=num_chunks,
allow=allow,
check_meta=check_meta,
)
for chunk_id in range(num_chunks)
]
for future in concurrent.futures.as_completed(futures):
for result in future.result():
# result = "python312Packages.numpy.x86_64-linux"

# ["python312Packages", "numpy", "x86_64-linux"]
splitted_result: list[str] = result.split(".")

# "x86_64-linux"
system = splitted_result.pop()

# "python312Packages.numpy"
path: str = ".".join(splitted_result)

# TODO: create a Package object
results[system].append(path)

return results


def package_attrs(
Expand Down
Loading