Staging instance, all changes can be removed at any time

Skip to content
Snippets Groups Projects

list nodes sorted by indegree

The snippet can be accessed without any authentication.
Authored by vlorentz
Edited
src/main.rs 5.48 KiB
use std::cell::RefCell;
use std::collections::BinaryHeap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

use anyhow::{Context, Result};
use clap::Parser;
use dsi_progress_logger::{ProgressLog, ProgressLogger};
use rayon::prelude::*;
use serde::Serialize;

use swh_graph::graph::*;
use swh_graph::java_compat::mph::gov::GOVMPH;
use swh_graph::SWHID;

#[derive(Parser, Debug)]
struct Args {
    graph_path: PathBuf,
    #[arg(long)]
    /// Maximum number of SWHIDs to return
    max: Option<usize>,
}

pub fn main() -> Result<()> {
    let args = Args::parse();

    stderrlog::new()
        .verbosity(2)
        .timestamp(stderrlog::Timestamp::Second)
        .init()
        .context("While Initializing the stderrlog")?;

    log::info!("Loading graph");
    let graph = swh_graph::graph::load_bidirectional(args.graph_path)
        .context("Could not load graph")?
        .init_properties()
        .load_properties(|props| props.load_maps::<GOVMPH>())
        .context("Could not load maps")?;
    log::info!("Graph loaded.");

    match args.max {
        None => main_unbounded(&graph),
        Some(max) => main_bounded(&graph, max),
    }
}

fn main_bounded<G>(graph: &G, max: usize) -> Result<()>
where
    G: SwhBackwardGraph + SwhGraphWithProperties + Send + Sync + 'static,
    <G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
{
    let heaps = thread_local::ThreadLocal::new();

    let mut pl = ProgressLogger::default();
    pl.item_name("node");
    pl.display_memory(true);
    pl.local_speed(true);
    pl.expected_updates(Some(graph.num_nodes()));
    pl.start("Traversing nodes...");
    let pl = Arc::new(Mutex::new(pl));
    swh_graph::utils::shuffle::par_iter_shuffled_range(0..graph.num_nodes()).for_each_init(
        || heaps.get_or(|| RefCell::new(BinaryHeap::with_capacity(max + 1))).borrow_mut(),
        |heap, node| {
            let step = 1 << 20;
            if node % step == 0 {
                pl.lock().unwrap().update_with_count(step)
            }

            // Insert the inverse so that BinaryHeap::pop() removes the node with the
            // lowest degree (as BinaryHeap is a max heap)
            heap.push((usize::MAX - graph.indegree(node), node));
            if heap.len() > max {
                heap.pop();
            }
        },
    );
    pl.lock().unwrap().done();

    let heaps: Vec<_> = heaps.into_iter().map(RefCell::into_inner).collect();
    let mut pl = ProgressLogger::default();
    pl.item_name("heap");
    pl.display_memory(true);
    pl.local_speed(true);
    pl.expected_updates(Some(heaps.len()));
    pl.start("Merging heaps...");
    let pl = Arc::new(Mutex::new(pl));
    let mut heap = heaps.into_par_iter().reduce_with(|mut heap1, mut heap2| {
        heap1.append(&mut heap2);
        while heap1.len() > max {
            heap1.pop();
        }
        pl.lock().unwrap().update();
        heap1
    }).unwrap_or_else(BinaryHeap::new);
    pl.lock().unwrap().done();


    let mut writer = csv::WriterBuilder::new()
        .has_headers(true)
        .terminator(csv::Terminator::CRLF)
        .from_writer(std::io::stdout());

    let mut pl = ProgressLogger::default();
    pl.item_name("node");
    pl.display_memory(true);
    pl.local_speed(true);
    pl.expected_updates(Some(max));
    pl.start("Writing results");
    while let Some((inverse_indegree, node)) = heap.pop() {
        let indegree = usize::MAX - inverse_indegree;
        let swhid = graph.properties().swhid(node);
        writer
            .serialize(OutputRecord { swhid, indegree })
            .context("Could not serialize record")?;
        pl.light_update();
    }
    pl.done();

    Ok(())
}

fn main_unbounded<G>(graph: &G) -> Result<()>
where
    G: SwhBackwardGraph + SwhGraphWithProperties + Send + Sync + 'static,
    <G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
{
    log::info!("Allocating node array...");
    let mut nodes: Vec<_> = (0..graph.num_nodes()).into_par_iter().collect();

    let mut pl = ProgressLogger::default();
    pl.item_name("node");
    pl.display_memory(true);
    pl.local_speed(true);
    pl.expected_updates(Some(graph.num_nodes()));
    pl.start("Computing indegrees...");
    let pl = Arc::new(Mutex::new(pl));
    let indegrees: Vec<_> =
        swh_graph::utils::shuffle::par_iter_shuffled_range(0..graph.num_nodes())
            .map(|node| {
                let step = 1 << 20;
                if node % step == 0 {
                    pl.lock().unwrap().update_with_count(step)
                }
                graph.indegree(node)
            })
            .collect();
    pl.lock().unwrap().done();

    log::info!("Sorting nodes...");
    nodes.par_sort_unstable_by_key(|&node| indegrees[node]);
    log::info!("Done sorting.");
    drop(indegrees);

    let mut writer = csv::WriterBuilder::new()
        .has_headers(true)
        .terminator(csv::Terminator::CRLF)
        .from_writer(std::io::stdout());

    let mut pl = ProgressLogger::default();
    pl.item_name("node");
    pl.display_memory(true);
    pl.local_speed(true);
    pl.expected_updates(Some(graph.num_nodes()));
    pl.start("Writing results");
    for node in nodes.into_iter().rev() {
        let swhid = graph.properties().swhid(node);
        writer
            .serialize(OutputRecord {
                swhid,
                indegree: graph.indegree(node),
            })
            .context("Could not serialize record")?;
        pl.light_update();
    }
    pl.done();

    Ok(())
}

#[derive(Serialize)]
struct OutputRecord {
    swhid: SWHID,
    indegree: usize,
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment