Staging instance, all changes can be removed at any time

Skip to content
Snippets Groups Projects

Script to download package.json files from the archive:

The snippet can be accessed without any authentication.
Authored by Kumar Shivendu
Edited
code.py 4.20 KiB
import json
import logging
import os
import subprocess
import sys
import concurrent.futures
from pathlib import Path
from typing import List

import requests
from tqdm import tqdm

logging.basicConfig(
    filename='log.txt',
    level=logging.INFO,
    format='%(asctime)s %(levelname)s %(message)s'
)
logger = logging.getLogger()

EXPECTED_NUM_LINES = 40_051_216 # Calculated from a different script
SEGMENT_SIZE = 10_000


def download_package_json(download_url: str) -> dict:
    """Download the package.json file at the given URL and extract dependencies and devDependencies."""
    try:
        res = requests.get(download_url)
        res.raise_for_status()
        p = subprocess.Popen(['zstd', '-d'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
        package_json_text = p.communicate(input=res.content)[0]
        p.kill()
        package_json = json.loads(package_json_text)
        dependencies = package_json.get('dependencies', {})
        dev_dependencies = package_json.get('devDependencies', {})
        return {"dependencies": dependencies, "devDependencies": dev_dependencies}
    except Exception as e:
        logger.error(f"Error downloading or parsing package.json at {download_url}: {e}")
        return {"dependencies": {}, "devDependencies": {}}


def process_segment(segment: List[str], chunk_idx: int, line_counter: int) -> int:
    """Process a segment of lines from the CSV file."""
    results = []

    SEGMENT_BATCH_SIZE = 100
    batches = [segment[i:i + SEGMENT_BATCH_SIZE] for i in range(0, len(segment), SEGMENT_BATCH_SIZE)]
    for batch in tqdm(batches):
        with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
            future_to_url = {executor.submit(download_package_json, f"https://softwareheritage.s3.amazonaws.com/content/{item}"): item for item in batch}
            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    data = future.result()
                except Exception as exc:
                    print('%r generated an exception: %s' % (url, exc))
                else:
                    results.append(data)

    # Write results to file
    chunk_dir = Path(f"./results/{chunk_idx}")
    chunk_dir.mkdir(parents=True, exist_ok=True)
    output_file = chunk_dir / f"{line_counter}.json"
    with output_file.open('w') as f:
        json.dump(results, f)

    return line_counter + len(segment)


def process_map(file_path: str, chunk_idx: int) -> int:
    """Process a map file."""
    chunk_offset = 0

    # Read progress file to resume processing if it exists
    progress_file = Path(f"./progress/{chunk_idx}")
    if progress_file.exists():
        chunk_offset = int(progress_file.read_text().strip())

    # Open zstdcat process
    with subprocess.Popen(['zstdcat', file_path], stdout=subprocess.PIPE) as p:

        # Skip header line
        next(p.stdout)

        # Skip lines until progress
        if chunk_offset > 0:
            print("Skipping till", chunk_offset)

            for _ in range(chunk_offset):
                next(p.stdout)

        # Read line by line
        segment = []
        for line in tqdm(p.stdout, total=EXPECTED_NUM_LINES):
            if len(segment) < SEGMENT_SIZE:
                sha1 = line[41:81].decode('utf-8')
                segment.append(sha1)
            else:
                chunk_offset = process_segment(segment, chunk_idx, chunk_offset)
                segment = []

                # Save progress
                progress_file.write_text(str(chunk_offset))

        # Process last segment
        if segment:
            chunk_offset = process_segment(segment, chunk_idx, chunk_offset)

        # Save final progress
        progress_file.write_text(str(chunk_offset))

    return chunk_offset


if __name__ == "__main__":
    file_paths = sorted(Path('./maps').glob('*'))
    print("Number of maps:", len(file_paths))

    Path("./results").mkdir(parents=True, exist_ok=True)
    Path("./progress").mkdir(parents=True, exist_ok=True)

    chunk_idx = int(sys.argv[1])
    map_file = file_paths[chunk_idx]

    num_content_from_map = process_map(str(map_file), chunk_idx)
    print(f"Number of package.json from map {map_file} are {num_content_from_map}")
  • @vlorentz As per tqdm, it's downloading 10k files in ~2.73 seconds. I got 30 files of nearly the same size (I'm calling them chunks) containing ~40M entries each. That's 1.2 billion files to process. So it will take 91 hours (((40_000_000 * 30) / 10_000) * 2.73 / 60 / 60) to just download all the files in batches of 100.

    Do you have any suggestions on how to improve this?

    Edited by Kumar Shivendu
  • vlorentz @vlorentz ·

    I don't know how it compares, but the way I do it is just start a dozen threads which download in parallel.

    91 hours isn't that bad considering you are working at the scale of the whole archive.

  • how many threads exactly? I'm using 20 at the moment.

  • Ohh, wait. I made a calculation mistake. It's downloading only 100 files in 2.73s. So that's 9100 hours (379 days). Pretty bad.

    Do you have any example script that I can refer to optimize/rewrite my code?

  • vlorentz @vlorentz ·
  • Awesome. Thanks a lot!

0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment