list nodes sorted by indegree
The snippet can be accessed without any authentication.
Authored by
vlorentz
Edited
use std::cell::RefCell;
use std::collections::BinaryHeap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use anyhow::{Context, Result};
use clap::Parser;
use dsi_progress_logger::{ProgressLog, ProgressLogger};
use rayon::prelude::*;
use serde::Serialize;
use swh_graph::graph::*;
use swh_graph::java_compat::mph::gov::GOVMPH;
use swh_graph::SWHID;
#[derive(Parser, Debug)]
struct Args {
graph_path: PathBuf,
#[arg(long)]
/// Maximum number of SWHIDs to return
max: Option<usize>,
}
pub fn main() -> Result<()> {
let args = Args::parse();
stderrlog::new()
.verbosity(2)
.timestamp(stderrlog::Timestamp::Second)
.init()
.context("While Initializing the stderrlog")?;
log::info!("Loading graph");
let graph = swh_graph::graph::load_bidirectional(args.graph_path)
.context("Could not load graph")?
.init_properties()
.load_properties(|props| props.load_maps::<GOVMPH>())
.context("Could not load maps")?;
log::info!("Graph loaded.");
match args.max {
None => main_unbounded(&graph),
Some(max) => main_bounded(&graph, max),
}
}
fn main_bounded<G>(graph: &G, max: usize) -> Result<()>
where
G: SwhBackwardGraph + SwhGraphWithProperties + Send + Sync + 'static,
<G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
{
let heaps = thread_local::ThreadLocal::new();
let mut pl = ProgressLogger::default();
pl.item_name("node");
pl.display_memory(true);
pl.local_speed(true);
pl.expected_updates(Some(graph.num_nodes()));
pl.start("Traversing nodes...");
let pl = Arc::new(Mutex::new(pl));
swh_graph::utils::shuffle::par_iter_shuffled_range(0..graph.num_nodes()).for_each_init(
|| heaps.get_or(|| RefCell::new(BinaryHeap::with_capacity(max + 1))).borrow_mut(),
|heap, node| {
let step = 1 << 20;
if node % step == 0 {
pl.lock().unwrap().update_with_count(step)
}
// Insert the inverse so that BinaryHeap::pop() removes the node with the
// lowest degree (as BinaryHeap is a max heap)
heap.push((usize::MAX - graph.indegree(node), node));
if heap.len() > max {
heap.pop();
}
},
);
pl.lock().unwrap().done();
let heaps: Vec<_> = heaps.into_iter().map(RefCell::into_inner).collect();
let mut pl = ProgressLogger::default();
pl.item_name("heap");
pl.display_memory(true);
pl.local_speed(true);
pl.expected_updates(Some(heaps.len()));
pl.start("Merging heaps...");
let pl = Arc::new(Mutex::new(pl));
let mut heap = heaps.into_par_iter().reduce_with(|mut heap1, mut heap2| {
heap1.append(&mut heap2);
while heap1.len() > max {
heap1.pop();
}
pl.lock().unwrap().update();
heap1
}).unwrap_or_else(BinaryHeap::new);
pl.lock().unwrap().done();
let mut writer = csv::WriterBuilder::new()
.has_headers(true)
.terminator(csv::Terminator::CRLF)
.from_writer(std::io::stdout());
let mut pl = ProgressLogger::default();
pl.item_name("node");
pl.display_memory(true);
pl.local_speed(true);
pl.expected_updates(Some(max));
pl.start("Writing results");
while let Some((inverse_indegree, node)) = heap.pop() {
let indegree = usize::MAX - inverse_indegree;
let swhid = graph.properties().swhid(node);
writer
.serialize(OutputRecord { swhid, indegree })
.context("Could not serialize record")?;
pl.light_update();
}
pl.done();
Ok(())
}
fn main_unbounded<G>(graph: &G) -> Result<()>
where
G: SwhBackwardGraph + SwhGraphWithProperties + Send + Sync + 'static,
<G as SwhGraphWithProperties>::Maps: swh_graph::properties::Maps,
{
log::info!("Allocating node array...");
let mut nodes: Vec<_> = (0..graph.num_nodes()).into_par_iter().collect();
let mut pl = ProgressLogger::default();
pl.item_name("node");
pl.display_memory(true);
pl.local_speed(true);
pl.expected_updates(Some(graph.num_nodes()));
pl.start("Computing indegrees...");
let pl = Arc::new(Mutex::new(pl));
let indegrees: Vec<_> =
swh_graph::utils::shuffle::par_iter_shuffled_range(0..graph.num_nodes())
.map(|node| {
let step = 1 << 20;
if node % step == 0 {
pl.lock().unwrap().update_with_count(step)
}
graph.indegree(node)
})
.collect();
pl.lock().unwrap().done();
log::info!("Sorting nodes...");
nodes.par_sort_unstable_by_key(|&node| indegrees[node]);
log::info!("Done sorting.");
drop(indegrees);
let mut writer = csv::WriterBuilder::new()
.has_headers(true)
.terminator(csv::Terminator::CRLF)
.from_writer(std::io::stdout());
let mut pl = ProgressLogger::default();
pl.item_name("node");
pl.display_memory(true);
pl.local_speed(true);
pl.expected_updates(Some(graph.num_nodes()));
pl.start("Writing results");
for node in nodes.into_iter().rev() {
let swhid = graph.properties().swhid(node);
writer
.serialize(OutputRecord {
swhid,
indegree: graph.indegree(node),
})
.context("Could not serialize record")?;
pl.light_update();
}
pl.done();
Ok(())
}
#[derive(Serialize)]
struct OutputRecord {
swhid: SWHID,
indegree: usize,
}
Please register or sign in to comment