use std::collections::HashSet; use std::os::unix::fs::PermissionsExt; use std::time::Duration; use anyhow::{anyhow, Error}; use common::{config_dir, BOOTSTRAP_RELAYS, CLIENT_NAME, SEARCH_RELAYS}; use gpui::{App, AppContext, Context, Entity, Global, Subscription, Task}; use nostr_connect::prelude::*; use nostr_lmdb::NostrLmdb; use nostr_sdk::prelude::*; mod device; mod event; mod gossip; mod identity; pub use device::*; pub use event::*; pub use gossip::*; pub use identity::*; use crate::identity::Identity; pub fn init(cx: &mut App) { NostrRegistry::set_global(cx.new(NostrRegistry::new), cx); } /// Default timeout for subscription pub const TIMEOUT: u64 = 3; /// Default timeout for Nostr Connect pub const NOSTR_CONNECT_TIMEOUT: u64 = 200; /// Default Nostr Connect relay pub const NOSTR_CONNECT_RELAY: &str = "wss://relay.nsec.app"; /// Default subscription id for device gift wrap events pub const DEVICE_GIFTWRAP: &str = "device-gift-wraps"; /// Default subscription id for user gift wrap events pub const USER_GIFTWRAP: &str = "user-gift-wraps"; struct GlobalNostrRegistry(Entity); impl Global for GlobalNostrRegistry {} /// Nostr Registry #[derive(Debug)] pub struct NostrRegistry { /// Nostr client client: Client, /// App keys /// /// Used for Nostr Connect and NIP-4e operations app_keys: Keys, /// Current identity (user's public key) /// /// Set by the current Nostr signer identity: Entity, /// Gossip implementation gossip: Entity, /// Tasks for asynchronous operations tasks: Vec>>, /// Subscriptions _subscriptions: Vec, } impl NostrRegistry { /// Retrieve the global nostr state pub fn global(cx: &App) -> Entity { cx.global::().0.clone() } /// Set the global nostr instance fn set_global(state: Entity, cx: &mut App) { cx.set_global(GlobalNostrRegistry(state)); } /// Create a new nostr instance fn new(cx: &mut Context) -> Self { // rustls uses the `aws_lc_rs` provider by default // This only errors if the default provider has already // been installed. We can ignore this `Result`. rustls::crypto::aws_lc_rs::default_provider() .install_default() .ok(); // Construct the nostr client options let opts = ClientOptions::new() .automatic_authentication(false) .verify_subscriptions(false) .sleep_when_idle(SleepWhenIdle::Enabled { timeout: Duration::from_secs(600), }); // Construct the lmdb let lmdb = cx.foreground_executor().block_on(async move { NostrLmdb::open(config_dir().join("nostr")) .await .expect("Failed to initialize database") }); // Construct the nostr client let client = ClientBuilder::default().database(lmdb).opts(opts).build(); let _ = tracker(); // Get the app keys let app_keys = Self::create_or_init_app_keys().unwrap(); // Construct the gossip entity let gossip = cx.new(|_| Gossip::default()); let async_gossip = gossip.downgrade(); // Construct the identity entity let identity = cx.new(|_| Identity::default()); // Channel for communication between nostr and gpui let (tx, rx) = flume::bounded::(2048); let mut subscriptions = vec![]; let mut tasks = vec![]; subscriptions.push( // Observe the identity entity cx.observe(&identity, |this, state, cx| { if state.read(cx).has_public_key() { match state.read(cx).relay_list_state() { RelayState::Initial => { this.get_relay_list(cx); } RelayState::Set => { if state.read(cx).messaging_relays_state() == RelayState::Initial { this.get_profile(cx); this.get_messaging_relays(cx); }; } _ => {} } } }), ); tasks.push( // Handle nostr notifications cx.background_spawn({ let client = client.clone(); async move { Self::handle_notifications(&client, &tx).await } }), ); tasks.push( // Update GPUI states cx.spawn(async move |_this, cx| { while let Ok(event) = rx.recv_async().await { match event.kind { Kind::RelayList => { async_gossip.update(cx, |this, cx| { this.insert_relays(&event); cx.notify(); })?; } Kind::InboxRelays => { async_gossip.update(cx, |this, cx| { this.insert_messaging_relays(&event); cx.notify(); })?; } _ => {} } } Ok(()) }), ); cx.defer(|cx| { let nostr = NostrRegistry::global(cx); nostr.update(cx, |this, cx| { this.get_identity(cx); }); }); Self { client, app_keys, identity, gossip, _subscriptions: subscriptions, tasks, } } /// Handle nostr notifications async fn handle_notifications(client: &Client, tx: &flume::Sender) -> Result<(), Error> { // Add bootstrap relay to the relay pool for url in BOOTSTRAP_RELAYS.into_iter() { client.add_relay(url).await?; } // Add search relay to the relay pool for url in SEARCH_RELAYS.into_iter() { client.add_relay(url).await?; } // Connect to all added relays client.connect().await; // Handle nostr notifications let mut notifications = client.notifications(); let mut processed_events = HashSet::new(); while let Ok(notification) = notifications.recv().await { if let RelayPoolNotification::Message { message, relay_url } = notification { match message { RelayMessage::Event { event, subscription_id, } => { if !processed_events.insert(event.id) { // Skip if the event has already been processed continue; } match event.kind { Kind::RelayList => { // Automatically get messaging relays for each member when the user opens a room if subscription_id.as_str().starts_with("room-") { Self::get_adv_events_by(client, event.as_ref()).await?; } tx.send_async(event.into_owned()).await?; } Kind::InboxRelays => { tx.send_async(event.into_owned()).await?; } _ => {} } } RelayMessage::Ok { event_id, message, .. } => { let msg = MachineReadablePrefix::parse(&message); let mut tracker = tracker().write().await; // Handle authentication messages if let Some(MachineReadablePrefix::AuthRequired) = msg { // Keep track of events that need to be resent after authentication tracker.add_to_pending(event_id, relay_url); } else { // Keep track of events sent by Coop tracker.sent(event_id) } } _ => {} } } } Ok(()) } /// Automatically get messaging relays and encryption announcement from a received relay list async fn get_adv_events_by(client: &Client, event: &Event) -> Result<(), Error> { // Subscription options let opts = SubscribeAutoCloseOptions::default() .timeout(Some(Duration::from_secs(TIMEOUT))) .exit_policy(ReqExitPolicy::ExitOnEOSE); // Extract write relays from event let write_relays: Vec<&RelayUrl> = nip65::extract_relay_list(event) .filter_map(|(url, metadata)| { if metadata.is_none() || metadata == &Some(RelayMetadata::Write) { Some(url) } else { None } }) .collect(); // Ensure relay connections for relay in write_relays.iter() { client.add_relay(*relay).await?; client.connect_relay(*relay).await?; } // Construct filter for inbox relays let inbox = Filter::new() .kind(Kind::InboxRelays) .author(event.pubkey) .limit(1); // Construct filter for encryption announcement let announcement = Filter::new() .kind(Kind::Custom(10044)) .author(event.pubkey) .limit(1); client .subscribe_to(write_relays, vec![inbox, announcement], Some(opts)) .await?; Ok(()) } /// Get or create a new app keys fn create_or_init_app_keys() -> Result { let dir = config_dir().join(".app_keys"); let content = match std::fs::read(&dir) { Ok(content) => content, Err(_) => { // Generate new keys if file doesn't exist let keys = Keys::generate(); let secret_key = keys.secret_key(); // Create directory and write secret key std::fs::create_dir_all(dir.parent().unwrap())?; std::fs::write(&dir, secret_key.to_secret_bytes())?; // Set permissions to readonly let mut perms = std::fs::metadata(&dir)?.permissions(); perms.set_mode(0o400); std::fs::set_permissions(&dir, perms)?; return Ok(keys); } }; let secret_key = SecretKey::from_slice(&content)?; let keys = Keys::new(secret_key); Ok(keys) } /// Get the nostr client pub fn client(&self) -> Client { self.client.clone() } /// Get the app keys pub fn app_keys(&self) -> &Keys { &self.app_keys } /// Get current identity pub fn identity(&self) -> Entity { self.identity.clone() } /// Get a relay hint (messaging relay) for a given public key pub fn relay_hint(&self, public_key: &PublicKey, cx: &App) -> Option { self.gossip .read(cx) .messaging_relays(public_key) .first() .cloned() } /// Get a list of write relays for a given public key pub fn write_relays(&self, public_key: &PublicKey, cx: &App) -> Task> { let client = self.client(); let relays = self.gossip.read(cx).write_relays(public_key); cx.background_spawn(async move { // Ensure relay connections for url in relays.iter() { client.add_relay(url).await.ok(); client.connect_relay(url).await.ok(); } relays }) } /// Get a list of read relays for a given public key pub fn read_relays(&self, public_key: &PublicKey, cx: &App) -> Task> { let client = self.client(); let relays = self.gossip.read(cx).read_relays(public_key); cx.background_spawn(async move { // Ensure relay connections for url in relays.iter() { client.add_relay(url).await.ok(); client.connect_relay(url).await.ok(); } relays }) } /// Get a list of messaging relays for a given public key pub fn messaging_relays(&self, public_key: &PublicKey, cx: &App) -> Task> { let client = self.client(); let relays = self.gossip.read(cx).messaging_relays(public_key); cx.background_spawn(async move { // Ensure relay connections for url in relays.iter() { client.add_relay(url).await.ok(); client.connect_relay(url).await.ok(); } relays }) } /// Set the signer for the nostr client and verify the public key pub fn set_signer(&mut self, signer: T, owned: bool, cx: &mut Context) where T: NostrSigner + 'static, { let client = self.client(); let identity = self.identity.downgrade(); // Create a task to update the signer and verify the public key let task: Task> = cx.background_spawn(async move { // Update signer client.set_signer(signer).await; // Verify signer let signer = client.signer().await?; let public_key = signer.get_public_key().await?; Ok(public_key) }); self.tasks.push(cx.spawn(async move |_this, cx| { match task.await { Ok(public_key) => { identity.update(cx, |this, cx| { this.set_public_key(public_key); this.reset_relay_state(); this.set_owned(owned); cx.notify(); })?; } Err(e) => { log::error!("Failed to set signer: {e}"); } }; Ok(()) })); } /// Unset the current signer pub fn unset_signer(&mut self, cx: &mut Context) { let client = self.client(); let async_identity = self.identity.downgrade(); self.tasks.push(cx.spawn(async move |_this, cx| { // Unset the signer from nostr client cx.background_executor() .await_on_background(async move { client.unset_signer().await; }) .await; // Unset the current identity async_identity .update(cx, |this, cx| { this.unset_public_key(); cx.notify(); }) .ok(); Ok(()) })); } // Get relay list for current user fn get_relay_list(&mut self, cx: &mut Context) { let client = self.client(); let async_identity = self.identity.downgrade(); let public_key = self.identity().read(cx).public_key(); let task: Task> = cx.background_spawn(async move { let filter = Filter::new() .kind(Kind::RelayList) .author(public_key) .limit(1); let mut stream = client .stream_events_from(BOOTSTRAP_RELAYS, vec![filter], Duration::from_secs(TIMEOUT)) .await?; while let Some((_url, res)) = stream.next().await { match res { Ok(event) => { log::info!("Received relay list event: {event:?}"); // Construct a filter to continuously receive relay list events let filter = Filter::new() .kind(Kind::RelayList) .author(public_key) .since(Timestamp::now()); // Subscribe to the relay list events client .subscribe_to(BOOTSTRAP_RELAYS, vec![filter], None) .await?; return Ok(RelayState::Set); } Err(e) => { log::error!("Failed to receive relay list event: {e}"); } } } Ok(RelayState::NotSet) }); self.tasks.push(cx.spawn(async move |_this, cx| { match task.await { Ok(state) => { async_identity .update(cx, |this, cx| { this.set_relay_list_state(state); cx.notify(); }) .ok(); } Err(e) => { log::error!("Failed to get relay list: {e}"); } } Ok(()) })); } /// Get profile and contact list for current user fn get_profile(&mut self, cx: &mut Context) { let client = self.client(); let public_key = self.identity().read(cx).public_key(); let write_relays = self.write_relays(&public_key, cx); let task: Task> = cx.background_spawn(async move { let mut urls = vec![]; urls.extend(write_relays.await); urls.extend( BOOTSTRAP_RELAYS .iter() .filter_map(|url| RelayUrl::parse(url).ok()), ); // Construct subscription options let opts = SubscribeAutoCloseOptions::default() .exit_policy(ReqExitPolicy::ExitOnEOSE) .timeout(Some(Duration::from_secs(TIMEOUT))); // Filter for metadata let metadata = Filter::new() .kind(Kind::Metadata) .limit(1) .author(public_key); // Filter for contact list let contact_list = Filter::new() .kind(Kind::ContactList) .limit(1) .author(public_key); client .subscribe_to(urls, vec![metadata, contact_list], Some(opts)) .await?; Ok(()) }); task.detach(); } /// Get messaging relays for current user fn get_messaging_relays(&mut self, cx: &mut Context) { let client = self.client(); let async_identity = self.identity.downgrade(); let public_key = self.identity().read(cx).public_key(); let write_relays = self.write_relays(&public_key, cx); let task: Task> = cx.background_spawn(async move { let urls = write_relays.await; // Construct the filter for inbox relays let filter = Filter::new() .kind(Kind::InboxRelays) .author(public_key) .limit(1); // Stream events from the write relays let mut stream = client .stream_events_from(&urls, vec![filter], Duration::from_secs(TIMEOUT)) .await?; while let Some((_url, res)) = stream.next().await { match res { Ok(event) => { log::info!("Received messaging relays event: {event:?}"); // Construct a filter to continuously receive relay list events let filter = Filter::new() .kind(Kind::InboxRelays) .author(public_key) .since(Timestamp::now()); // Subscribe to the relay list events client.subscribe_to(&urls, vec![filter], None).await?; return Ok(RelayState::Set); } Err(e) => { log::error!("Failed to get messaging relays: {e}"); } } } Ok(RelayState::NotSet) }); self.tasks.push(cx.spawn(async move |_this, cx| { match task.await { Ok(state) => { async_identity .update(cx, |this, cx| { this.set_messaging_relays_state(state); cx.notify(); }) .ok(); } Err(e) => { log::error!("Failed to get messaging relays: {e}"); } } Ok(()) })); } /// Set the metadata for the current user pub fn set_metadata(&self, metadata: &Metadata, cx: &App) -> Task> { let client = self.client(); let public_key = self.identity().read(cx).public_key(); let write_relays = self.write_relays(&public_key, cx); let metadata = metadata.clone(); cx.background_spawn(async move { let urls = write_relays.await; let signer = client.signer().await?; // Sign the new metadata event let event = EventBuilder::metadata(&metadata).sign(&signer).await?; // Send event to user's write relayss client.send_event_to(urls, &event).await?; Ok(()) }) } /// Get local stored identity fn get_identity(&mut self, cx: &mut Context) { let read_credential = cx.read_credentials(CLIENT_NAME); self.tasks.push(cx.spawn(async move |this, cx| { match read_credential.await { Ok(Some((_, secret))) => { let secret = SecretKey::from_slice(&secret)?; let keys = Keys::new(secret); this.update(cx, |this, cx| { this.set_signer(keys, false, cx); }) .ok(); } _ => { this.update(cx, |this, cx| { this.get_bunker(cx); }) .ok(); } } Ok(()) })); } /// Create a new identity fn create_identity(&mut self, cx: &mut Context) { let keys = Keys::generate(); let write_credential = cx.write_credentials( CLIENT_NAME, &keys.public_key().to_hex(), &keys.secret_key().to_secret_bytes(), ); // Update the signer self.set_signer(keys, false, cx); // TODO: set metadata // Spawn a task to write the credentials cx.background_spawn(async move { if let Err(e) = write_credential.await { log::error!("Failed to write credentials: {}", e); } }) .detach(); } /// Get local stored bunker connection fn get_bunker(&mut self, cx: &mut Context) { let client = self.client(); let app_keys = self.app_keys().clone(); let timeout = Duration::from_secs(NOSTR_CONNECT_TIMEOUT); let task: Task> = cx.background_spawn(async move { log::info!("Getting bunker connection"); let filter = Filter::new() .kind(Kind::ApplicationSpecificData) .identifier("coop:account") .limit(1); if let Some(event) = client.database().query(filter).await?.first_owned() { let uri = NostrConnectUri::parse(event.content)?; let signer = NostrConnect::new(uri.clone(), app_keys.clone(), timeout, None)?; Ok(signer) } else { Err(anyhow!("No account found")) } }); self.tasks.push(cx.spawn(async move |this, cx| { match task.await { Ok(signer) => { this.update(cx, |this, cx| { this.set_signer(signer, true, cx); }) .ok(); } Err(e) => { log::warn!("Failed to get bunker: {e}"); // Create a new identity if no stored bunker exists this.update(cx, |this, cx| { this.create_identity(cx); }) .ok(); } } Ok(()) })); } /// Store the bunker connection for the next login pub fn persist_bunker(&mut self, uri: NostrConnectUri, cx: &mut App) { let client = self.client(); let rng_keys = Keys::generate(); self.tasks.push(cx.background_spawn(async move { // Construct the event for application-specific data let event = EventBuilder::new(Kind::ApplicationSpecificData, uri.to_string()) .tag(Tag::identifier("coop:account")) .sign(&rng_keys) .await?; // Store the event in the database client.database().save_event(&event).await?; Ok(()) })); } /// Generate a direct nostr connection initiated by the client pub fn client_connect(&self, relay: Option) -> (NostrConnect, NostrConnectUri) { let app_keys = self.app_keys(); let timeout = Duration::from_secs(NOSTR_CONNECT_TIMEOUT); // Determine the relay will be used for Nostr Connect let relay = match relay { Some(relay) => relay, None => RelayUrl::parse(NOSTR_CONNECT_RELAY).unwrap(), }; // Generate the nostr connect uri let uri = NostrConnectUri::client(app_keys.public_key(), vec![relay], CLIENT_NAME); // Generate the nostr connect let mut signer = NostrConnect::new(uri.clone(), app_keys.clone(), timeout, None).unwrap(); // Handle the auth request signer.auth_url_handler(CoopAuthUrlHandler); (signer, uri) } } #[derive(Debug, Clone)] pub struct CoopAuthUrlHandler; impl AuthUrlHandler for CoopAuthUrlHandler { #[allow(mismatched_lifetime_syntaxes)] fn on_auth_url(&self, auth_url: Url) -> BoxedFuture> { Box::pin(async move { webbrowser::open(auth_url.as_str())?; Ok(()) }) } }