use std::collections::HashMap; use std::time::Duration; use anyhow::{Error, anyhow}; use common::config_dir; use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, SharedString, Task, Window}; use nostr_connect::prelude::*; use nostr_gossip_memory::prelude::*; use nostr_lmdb::prelude::*; use nostr_sdk::prelude::*; mod blossom; mod constants; mod nip05; mod nip4e; pub use blossom::*; pub use constants::*; pub use nip4e::*; pub use nip05::*; pub fn init(window: &mut Window, cx: &mut App) { // rustls uses the `aws_lc_rs` provider by default // This only errors if the default provider has already // been installed. We can ignore this `Result`. rustls::crypto::aws_lc_rs::default_provider() .install_default() .ok(); // Initialize the tokio runtime gpui_tokio::init(cx); NostrRegistry::set_global(cx.new(|cx| NostrRegistry::new(window, cx)), cx); } struct GlobalNostrRegistry(Entity); impl Global for GlobalNostrRegistry {} /// Signer event. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum StateEvent { /// Connecting to the bootstrapping relay Connecting, /// Connected to the bootstrapping relay Connected, /// An error occurred Error(SharedString), } impl StateEvent { pub fn error(error: T) -> Self where T: Into, { Self::Error(error.into()) } } /// Nostr Registry #[derive(Debug)] pub struct NostrRegistry { /// Nostr client client: Client, /// Currently active signer pub signer: Entity>, /// Tasks for asynchronous operations tasks: Vec>>, } impl EventEmitter for NostrRegistry {} impl NostrRegistry { /// Retrieve the global nostr state pub fn global(cx: &App) -> Entity { cx.global::().0.clone() } /// Set the global nostr instance fn set_global(state: Entity, cx: &mut App) { cx.set_global(GlobalNostrRegistry(state)); } /// Create a new nostr instance fn new(window: &mut Window, cx: &mut Context) -> Self { let signer = cx.new(|_| None); // Construct the nostr lmdb instance let lmdb = cx.foreground_executor().block_on(async move { NostrLmdb::open(config_dir().join("nostr")) .await .expect("Failed to initialize database") }); // Construct the nostr client let client = ClientBuilder::default() .database(lmdb) .gossip(NostrGossipMemory::unbounded()) .gossip_config( GossipConfig::default() .sync_initial_timeout(Duration::from_millis(100)) .sync_idle_timeout(Duration::from_millis(100)) .no_background_refresh(), ) .connect_timeout(Duration::from_secs(10)) .sleep_when_idle(SleepWhenIdle::Enabled { timeout: Duration::from_secs(600), }) .build(); // Run at the end of current cycle cx.defer_in(window, |this, _window, cx| { this.connect(cx); }); Self { client, signer, tasks: vec![], } } /// Get the nostr client pub fn client(&self) -> Client { self.client.clone() } /// Get the signer pub fn signer(&self, cx: &App) -> Option { self.signer.read(cx).clone() } /// Get the public key of the signer pub fn signer_pubkey(&self, cx: &App) -> Option { self.signer.read(cx).as_ref().map(|s| s.public_key()) } /// Set the signer to the given keys pub fn set_signer(&mut self, new_keys: Keys, cx: &mut Context) { self.signer.update(cx, |this, cx| { *this = Some(new_keys); cx.notify(); }); } /// Connect to the bootstrapping relays fn connect(&mut self, cx: &mut Context) { let client = self.client(); let task: Task> = cx.background_spawn(async move { // Add indexer relay to the relay pool for url in INDEXER_RELAYS.into_iter() { client .add_relay(url) .capabilities(RelayCapabilities::DISCOVERY) .await?; } // Add bootstrap relay to the relay pool for url in BOOTSTRAP_RELAYS.into_iter() { client.add_relay(url).await?; } // Connect to all added relays client.connect().await; Ok(()) }); // Emit connecting event cx.emit(StateEvent::Connecting); self.tasks.push(cx.spawn(async move |this, cx| { if let Err(e) = task.await { this.update(cx, |_this, cx| { cx.emit(StateEvent::error(e.to_string())); })?; } else { this.update(cx, |_this, cx| { cx.emit(StateEvent::Connected); })?; } Ok(()) })); } /// Get the public key of a NIP-05 address pub fn query_address(&self, addr: Nip05Address, cx: &App) -> Task> { let client = self.client(); let http_client = cx.http_client(); cx.background_spawn(async move { let profile = addr.profile(&http_client).await?; let public_key = profile.public_key; let opts = SubscribeAutoCloseOptions::default() .exit_policy(ReqExitPolicy::ExitOnEOSE) .timeout(Some(Duration::from_secs(3))); // Construct the filter for the metadata event let filter = Filter::new() .kind(Kind::Metadata) .author(public_key) .limit(1); // Construct target for subscription let target: HashMap<&str, Vec> = BOOTSTRAP_RELAYS .into_iter() .map(|relay| (relay, vec![filter.clone()])) .collect(); client.subscribe(target).close_on(opts).await?; Ok(public_key) }) } /// Perform a NIP-50 global search for user profiles based on a given query pub fn search(&self, query: &str, cx: &App) -> Task, Error>> { let client = self.client(); let query = query.to_string(); // Get the address task if the query is a valid NIP-05 address let address_task = if let Ok(addr) = Nip05Address::parse(&query) { Some(self.query_address(addr, cx)) } else { None }; cx.background_spawn(async move { let mut results: Vec = Vec::with_capacity(FIND_LIMIT); // Return early if the query is a valid NIP-05 address if let Some(task) = address_task && let Ok(public_key) = task.await { results.push(public_key); return Ok(results); } // Add search relay to the relay pool for url in SEARCH_RELAYS.into_iter() { if client.relay(url).await.is_ok() { client .add_relay(url) .capabilities(RelayCapabilities::READ) .await?; } else { return Err(anyhow!("Failed to add search relay: {}", url)); } } // Return early if the query is a valid public key if let Ok(public_key) = PublicKey::parse(&query) { results.push(public_key); return Ok(results); } // Construct the filter for the search query let filter = Filter::new() .search(query.to_lowercase()) .kind(Kind::Metadata) .limit(FIND_LIMIT); // Construct target for subscription let target: HashMap<&str, Vec> = SEARCH_RELAYS .into_iter() .map(|relay| (relay, vec![filter.clone()])) .collect(); // Stream events from the search relays let mut stream = client .stream_events(target) .timeout(Duration::from_secs(TIMEOUT)) .await?; // Collect the results while let Some((_url, res)) = stream.next().await { if let Ok(event) = res { results.push(event.pubkey); } } if results.is_empty() { return Err(anyhow!("No results for query {query}")); } Ok(results) }) } /// Perform a WoT (via Vertex) search for a given query. pub fn wot_search(&self, query: &str, cx: &App) -> Task, Error>> { let client = self.client(); let query = query.to_string(); let Some(signer) = self.signer.read(cx).clone() else { return Task::ready(Err(anyhow!("Signer is required"))); }; cx.background_spawn(async move { // Construct a vertex request event let event = EventBuilder::new(Kind::Custom(5315), "") .tags(vec![ Tag::custom("param", vec!["search", &query]), Tag::custom("param", vec!["limit", "10"]), ]) .finalize_async(&signer) .await?; // Send the event to vertex relays let output = client.send_event(&event).to(WOT_RELAYS).await?; // Construct a filter to get the response or error from vertex let filter = Filter::new() .kinds(vec![Kind::Custom(6315), Kind::Custom(7000)]) .event(output.id().to_owned()); // Construct target for subscription let target: HashMap<&str, Vec> = WOT_RELAYS .into_iter() .map(|relay| (relay, vec![filter.clone()])) .collect(); // Stream events from the wot relays let mut stream = client .stream_events(target) .timeout(Duration::from_secs(TIMEOUT)) .await?; while let Some((_url, res)) = stream.next().await { if let Ok(event) = res { match event.kind { Kind::Custom(6315) => { let content: serde_json::Value = serde_json::from_str(&event.content)?; let pubkeys: Vec = content .as_array() .into_iter() .flatten() .filter_map(|item| item.as_object()) .filter_map(|obj| obj.get("pubkey").and_then(|v| v.as_str())) .filter_map(|pubkey_str| PublicKey::parse(pubkey_str).ok()) .collect(); return Ok(pubkeys); } Kind::Custom(7000) => { return Err(anyhow!("Search error")); } _ => {} } } } Err(anyhow!("No results for query: {query}")) }) } }