use std::cell::RefCell; use std::collections::{HashMap, HashSet}; use std::rc::Rc; use std::time::Duration; use anyhow::{Error, anyhow}; use common::EventUtils; use gpui::{App, AppContext, Context, Entity, Global, Task, Window}; use nostr_sdk::prelude::*; use smallvec::{SmallVec, smallvec}; use state::{Announcement, BOOTSTRAP_RELAYS, NostrRegistry, TIMEOUT}; mod person; pub use person::*; pub fn init(window: &mut Window, cx: &mut App) { PersonRegistry::set_global(cx.new(|cx| PersonRegistry::new(window, cx)), cx); } struct GlobalPersonRegistry(Entity); impl Global for GlobalPersonRegistry {} #[derive(Debug, Clone)] enum Dispatch { Person(Box), Announcement(Box), Relays(Box), } /// Person Registry #[derive(Debug)] pub struct PersonRegistry { /// Collection of all persons (user profiles) persons: HashMap>, /// Set of public keys that have been seen seens: Rc>>, /// Sender for requesting metadata sender: flume::Sender, /// Tasks for asynchronous operations tasks: SmallVec<[Task<()>; 4]>, } impl PersonRegistry { /// Retrieve the global person registry pub fn global(cx: &App) -> Entity { cx.global::().0.clone() } /// Set the global person registry instance fn set_global(state: Entity, cx: &mut App) { cx.set_global(GlobalPersonRegistry(state)); } /// Create a new person registry instance fn new(window: &mut Window, cx: &mut Context) -> Self { let nostr = NostrRegistry::global(cx); let client = nostr.read(cx).client(); // Channel for communication between nostr and gpui let (tx, rx) = flume::bounded::(100); let (mta_tx, mta_rx) = flume::unbounded::(); let mut tasks = smallvec![]; tasks.push( // Handle nostr notifications cx.background_spawn({ let client = client.clone(); async move { Self::handle_notifications(&client, &tx).await; } }), ); tasks.push( // Handle metadata requests cx.background_spawn({ let client = client.clone(); async move { Self::handle_requests(&client, &mta_rx).await; } }), ); tasks.push( // Update GPUI state cx.spawn(async move |this, cx| { while let Ok(event) = rx.recv_async().await { this.update(cx, |this, cx| { match event { Dispatch::Person(person) => { this.insert(*person, cx); } Dispatch::Announcement(event) => { this.set_announcement(&event, cx); } Dispatch::Relays(event) => { this.set_messaging_relays(&event, cx); } }; }) .ok(); } }), ); // Load all user profiles from the database cx.defer_in(window, |this, _window, cx| { this.load(cx); }); Self { persons: HashMap::new(), seens: Rc::new(RefCell::new(HashSet::new())), sender: mta_tx, tasks, } } /// Handle nostr notifications async fn handle_notifications(client: &Client, tx: &flume::Sender) { let mut notifications = client.notifications(); let mut processed: HashSet = HashSet::new(); while let Some(notification) = notifications.next().await { let ClientNotification::Message { message, .. } = notification else { // Skip if the notification is not a message continue; }; if let RelayMessage::Event { event, .. } = message { // Skip if the event has already been processed if !processed.insert(event.id) { continue; } match event.kind { Kind::Metadata => { let metadata = Metadata::from_json(&event.content).unwrap_or_default(); let person = Person::new(event.pubkey, metadata); let val = Box::new(person); // Send tx.send_async(Dispatch::Person(val)).await.ok(); } Kind::ContactList => { let public_keys = event.extract_public_keys(); // Get metadata for all public keys get_metadata(client, public_keys).await.ok(); } Kind::InboxRelays => { let val = Box::new(event.into_owned()); // Send tx.send_async(Dispatch::Relays(val)).await.ok(); } Kind::Custom(10044) => { let val = Box::new(event.into_owned()); // Send tx.send_async(Dispatch::Announcement(val)).await.ok(); } _ => {} } } } } /// Handle request for metadata async fn handle_requests(client: &Client, rx: &flume::Receiver) { let mut batch: HashSet = HashSet::new(); loop { match flume::Selector::new() .recv(rx, |result| result.ok()) .wait_timeout(Duration::from_secs(TIMEOUT)) { Ok(Some(public_key)) => { batch.insert(public_key); // Process the batch if it's full if batch.len() >= 20 { get_metadata(client, std::mem::take(&mut batch)).await.ok(); } } _ => { if !batch.is_empty() { get_metadata(client, std::mem::take(&mut batch)).await.ok(); } } } } } /// Load all user profiles from the database fn load(&mut self, cx: &mut Context) { let nostr = NostrRegistry::global(cx); let client = nostr.read(cx).client(); let task: Task, Error>> = cx.background_spawn(async move { let filter = Filter::new().kind(Kind::Metadata).limit(200); let events = client.database().query(filter).await?; let persons = events .into_iter() .map(|event| { let metadata = Metadata::from_json(event.content).unwrap_or_default(); Person::new(event.pubkey, metadata) }) .collect(); Ok(persons) }); self.tasks.push(cx.spawn(async move |this, cx| { if let Ok(persons) = task.await { this.update(cx, |this, cx| { this.bulk_inserts(persons, cx); }) .ok(); } })); } /// Set profile encryption keys announcement fn set_announcement(&mut self, event: &Event, cx: &mut App) { let announcement = Announcement::from(event); if let Some(person) = self.persons.get(&event.pubkey) { person.update(cx, |person, cx| { person.set_announcement(announcement); cx.notify(); }); } else { let person = Person::new(event.pubkey, Metadata::default()).with_announcement(announcement); self.insert(person, cx); } } /// Set messaging relays for a person fn set_messaging_relays(&mut self, event: &Event, cx: &mut App) { let urls: Vec = nip17::extract_relay_list(event).cloned().collect(); if let Some(person) = self.persons.get(&event.pubkey) { person.update(cx, |person, cx| { person.set_messaging_relays(urls); cx.notify(); }); } else { let person = Person::new(event.pubkey, Metadata::default()).with_messaging_relays(urls); self.insert(person, cx); } } /// Insert batch of persons fn bulk_inserts(&mut self, persons: Vec, cx: &mut Context) { for person in persons.into_iter() { let public_key = person.public_key(); self.persons .entry(public_key) .or_insert_with(|| cx.new(|_| person)); } cx.notify(); } /// Insert or update a person pub fn insert(&mut self, person: Person, cx: &mut App) { let public_key = person.public_key(); match self.persons.get(&public_key) { Some(this) => { this.update(cx, |this, cx| { this.set_metadata(person.metadata()); cx.notify(); }); } None => { self.persons.insert(public_key, cx.new(|_| person)); } } } /// Get single person by public key pub fn get(&self, public_key: &PublicKey, cx: &App) -> Person { if let Some(person) = self.persons.get(public_key) { return person.read(cx).clone(); } let public_key = *public_key; let mut seen = self.seens.borrow_mut(); if seen.insert(public_key) { let sender = self.sender.clone(); // Spawn background task to request metadata cx.background_spawn(async move { if let Err(e) = sender.send_async(public_key).await { log::warn!("Failed to send public key for metadata request: {}", e); } }) .detach(); } // Return a temporary profile with default metadata Person::new(public_key, Metadata::default()) } } /// Get metadata for all public keys in a event async fn get_metadata(client: &Client, public_keys: I) -> Result<(), Error> where I: IntoIterator, { let authors: Vec = public_keys.into_iter().collect(); let limit = authors.len(); if authors.is_empty() { return Err(anyhow!("You need at least one public key")); } // Construct the subscription option let opts = SubscribeAutoCloseOptions::default() .exit_policy(ReqExitPolicy::ExitOnEOSE) .timeout(Some(Duration::from_secs(TIMEOUT))); // Construct the filter for metadata let filter = Filter::new() .kind(Kind::Metadata) .authors(authors) .limit(limit); // Construct target for subscription let target: HashMap<&str, Vec> = BOOTSTRAP_RELAYS .into_iter() .map(|relay| (relay, vec![filter.clone()])) .collect(); client.subscribe(target).close_on(opts).await?; Ok(()) }