diff --git a/crates/coop/src/chatspace.rs b/crates/coop/src/chatspace.rs index 5067353..3901c0a 100644 --- a/crates/coop/src/chatspace.rs +++ b/crates/coop/src/chatspace.rs @@ -342,7 +342,7 @@ impl ChatSpace { ) -> impl IntoElement { let proxy = AppSettings::get_proxy_user_avatars(cx); let need_backup = Identity::read_global(cx).need_backup(); - let relay_ready = Identity::read_global(cx).relay_ready(); + let has_dm_relays = Identity::read_global(cx).has_dm_relays(); let updating = AutoUpdater::read_global(cx).status.is_updating(); let updated = AutoUpdater::read_global(cx).status.is_updated(); @@ -377,7 +377,7 @@ impl ChatSpace { }), ) }) - .when_some(relay_ready, |this, status| { + .when_some(has_dm_relays, |this, status| { this.when(!status, |this| this.child(messaging_relays::relay_button())) }) .when_some(need_backup, |this, keys| { diff --git a/crates/coop/src/main.rs b/crates/coop/src/main.rs index 43c97ca..884055f 100644 --- a/crates/coop/src/main.rs +++ b/crates/coop/src/main.rs @@ -9,7 +9,7 @@ use global::constants::{ APP_ID, APP_NAME, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT, METADATA_BATCH_TIMEOUT, SEARCH_RELAYS, WAIT_FOR_FINISH, }; -use global::{gift_wrap_sub_id, nostr_client, starting_time, NostrSignal}; +use global::{nostr_client, processed_events, starting_time, NostrSignal}; use gpui::{ actions, point, px, size, App, AppContext, Application, Bounds, KeyBinding, Menu, MenuItem, SharedString, TitlebarOptions, WindowBackgroundAppearance, WindowBounds, WindowDecorations, @@ -50,9 +50,7 @@ fn main() { let (signal_tx, signal_rx) = channel::bounded::(2048); let (mta_tx, mta_rx) = channel::bounded::(1024); let (event_tx, event_rx) = channel::bounded::(2048); - let signal_tx_clone = signal_tx.clone(); - let mta_tx_clone = mta_tx.clone(); app.background_executor() .spawn(async move { @@ -64,9 +62,7 @@ fn main() { // Handle Nostr notifications. // // Send the redefined signal back to GPUI via channel. - if let Err(e) = - handle_nostr_notifications(client, &signal_tx_clone, &mta_tx_clone, &event_tx).await - { + if let Err(e) = handle_nostr_notifications(&signal_tx_clone, &event_tx).await { log::error!("Failed to handle Nostr notifications: {e}"); } }) @@ -75,6 +71,7 @@ fn main() { app.background_executor() .spawn(async move { let duration = Duration::from_millis(METADATA_BATCH_TIMEOUT); + let mut processed_pubkeys: BTreeSet = BTreeSet::new(); let mut batch: BTreeSet = BTreeSet::new(); /// Internal events for the metadata batching system @@ -102,20 +99,23 @@ fn main() { match smol::future::or(recv(), timeout()).await { BatchEvent::NewKeys(public_key) => { - batch.insert(public_key); - // Process immediately if batch limit reached + // Prevent duplicate keys from being processed + if processed_pubkeys.insert(public_key) { + batch.insert(public_key); + } + // Process the batch if it's full if batch.len() >= METADATA_BATCH_LIMIT { - sync_data_for_pubkeys(client, std::mem::take(&mut batch)).await; + sync_data_for_pubkeys(std::mem::take(&mut batch)).await; } } BatchEvent::Timeout => { if !batch.is_empty() { - sync_data_for_pubkeys(client, std::mem::take(&mut batch)).await; + sync_data_for_pubkeys(std::mem::take(&mut batch)).await; } } BatchEvent::Closed => { if !batch.is_empty() { - sync_data_for_pubkeys(client, std::mem::take(&mut batch)).await; + sync_data_for_pubkeys(std::mem::take(&mut batch)).await; } break; } @@ -243,7 +243,7 @@ fn main() { while let Ok(signal) = signal_rx.recv().await { cx.update(|window, cx| { let registry = Registry::global(cx); - let identity = Identity::read_global(cx); + let identity = Identity::global(cx); match signal { // Load chat rooms and stop the loading status @@ -267,15 +267,6 @@ fn main() { } }); } - // Load chat rooms without setting as finished - NostrSignal::Eose(subscription_id) => { - // Only load chat rooms if the subscription matches the gift wrap subscription - if gift_wrap_sub_id() == &subscription_id { - registry.update(cx, |this, cx| { - this.load_rooms(window, cx); - }); - } - } // Add the new metadata to the registry or update the existing one NostrSignal::Metadata(event) => { registry.update(cx, |this, cx| { @@ -284,12 +275,17 @@ fn main() { } // Convert the gift wrapped message to a message NostrSignal::GiftWrap(event) => { - if let Some(public_key) = identity.public_key() { + if let Some(public_key) = identity.read(cx).public_key() { registry.update(cx, |this, cx| { this.event_to_message(public_key, event, window, cx); }); } } + NostrSignal::DmRelaysFound => { + identity.update(cx, |this, cx| { + this.set_has_dm_relays(cx); + }); + } NostrSignal::Notice(_msg) => { // window.push_notification(msg, cx); } @@ -356,67 +352,116 @@ async fn connect(client: &Client) -> Result<(), Error> { } async fn handle_nostr_notifications( - client: &Client, signal_tx: &Sender, - mta_tx: &Sender, event_tx: &Sender, ) -> Result<(), Error> { - let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); + let client = nostr_client(); + let auto_close = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); let mut notifications = client.notifications(); - let mut processed_events: BTreeSet = BTreeSet::new(); - let mut processed_dm_relays: BTreeSet = BTreeSet::new(); + let mut processed_relay_list = false; + let mut processed_inbox_relay = false; while let Ok(notification) = notifications.recv().await { let RelayPoolNotification::Message { message, .. } = notification else { continue; }; - match message { - RelayMessage::Event { event, .. } => { - if processed_events.contains(&event.id) { - continue; - } - // Skip events that have already been processed - processed_events.insert(event.id); + let RelayMessage::Event { event, .. } = message else { + continue; + }; - match event.kind { - Kind::GiftWrap => { - event_tx.send(event.into_owned()).await.ok(); - } - Kind::Metadata => { - signal_tx - .send(NostrSignal::Metadata(event.into_owned())) - .await - .ok(); - } - Kind::ContactList => { - if let Ok(true) = check_author(client, &event).await { - for public_key in event.tags.public_keys().copied() { - mta_tx.send(public_key).await.ok(); - } - } - } - Kind::RelayList => { - if processed_dm_relays.contains(&event.pubkey) { - continue; - } - // Skip public keys that have already been processed - processed_dm_relays.insert(event.pubkey); + // Skip events that have already been processed + if !processed_events().write().await.insert(event.id) { + continue; + } - let filter = Filter::new() - .author(event.pubkey) - .kind(Kind::InboxRelays) - .limit(1); - - client.subscribe(filter, Some(opts)).await.ok(); + match event.kind { + Kind::RelayList => { + // Get metadata for event's pubkey that matches the current user's pubkey + if let Ok(true) = is_from_current_user(&event).await { + match processed_relay_list { + true => continue, + false => processed_relay_list = true, } - _ => {} + + let sub_id = SubscriptionId::new("metadata"); + let filter = Filter::new() + .kinds(vec![Kind::Metadata, Kind::ContactList, Kind::InboxRelays]) + .author(event.pubkey) + .limit(10); + + client + .subscribe_with_id(sub_id, filter, Some(auto_close)) + .await + .ok(); } } - RelayMessage::EndOfStoredEvents(subscription_id) => { + Kind::InboxRelays => { + if let Ok(true) = is_from_current_user(&event).await { + match processed_inbox_relay { + true => continue, + false => processed_inbox_relay = true, + } + + // Get all inbox relays + let relays = event + .tags + .filter_standardized(TagKind::Relay) + .filter_map(|t| { + if let TagStandard::Relay(url) = t { + Some(url.to_owned()) + } else { + None + } + }) + .collect_vec(); + + if !relays.is_empty() { + // Add relays to nostr client + for relay in relays.iter() { + _ = client.add_relay(relay).await; + _ = client.connect_relay(relay).await; + } + + log::info!("Connected to messaging relays"); + + let filter = Filter::new().kind(Kind::GiftWrap).pubkey(event.pubkey); + let sub_id = SubscriptionId::new("gift-wrap"); + + // Notify the UI that the current user has set up the DM relays + signal_tx.send(NostrSignal::DmRelaysFound).await.ok(); + + if client + .subscribe_with_id_to(relays.clone(), sub_id, filter, None) + .await + .is_ok() + { + log::info!("Subscribing to gift wrap events in: {relays:?}"); + } + } + } + } + Kind::ContactList => { + if let Ok(true) = is_from_current_user(&event).await { + let public_keys: Vec = event.tags.public_keys().copied().collect(); + let kinds = vec![Kind::Metadata, Kind::ContactList]; + let lens = public_keys.len() * kinds.len(); + let filter = Filter::new().limit(lens).authors(public_keys).kinds(kinds); + + client + .subscribe_to(BOOTSTRAP_RELAYS, filter, Some(auto_close)) + .await + .ok(); + } + } + Kind::Metadata => { signal_tx - .send(NostrSignal::Eose(subscription_id.into_owned())) - .await?; + .send(NostrSignal::Metadata(event.into_owned())) + .await + .ok(); + } + Kind::GiftWrap => { + event_tx.send(event.into_owned()).await.ok(); } _ => {} } @@ -425,28 +470,28 @@ async fn handle_nostr_notifications( Ok(()) } -async fn check_author(client: &Client, event: &Event) -> Result { +async fn is_from_current_user(event: &Event) -> Result { + let client = nostr_client(); let signer = client.signer().await?; let public_key = signer.get_public_key().await?; Ok(public_key == event.pubkey) } -async fn sync_data_for_pubkeys(client: &Client, public_keys: BTreeSet) { +async fn sync_data_for_pubkeys(public_keys: BTreeSet) { + let client = nostr_client(); let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); - let kinds = vec![Kind::Metadata, Kind::ContactList, Kind::RelayList]; + let kinds = vec![Kind::Metadata, Kind::ContactList]; let filter = Filter::new() .limit(public_keys.len() * kinds.len()) .authors(public_keys) .kinds(kinds); - if let Err(e) = client + client .subscribe_to(BOOTSTRAP_RELAYS, filter, Some(opts)) .await - { - log::error!("Failed to sync metadata: {e}"); - } + .ok(); } /// Stores an unwrapped event in local database with reference to original diff --git a/crates/coop/src/views/messaging_relays.rs b/crates/coop/src/views/messaging_relays.rs index ecfc576..cf2dd20 100644 --- a/crates/coop/src/views/messaging_relays.rs +++ b/crates/coop/src/views/messaging_relays.rs @@ -1,7 +1,7 @@ use std::time::Duration; use anyhow::{anyhow, Error}; -use global::constants::{GIFT_WRAP_SUB_ID, NIP17_RELAYS}; +use global::constants::NIP17_RELAYS; use global::nostr_client; use gpui::prelude::FluentBuilder; use gpui::{ @@ -10,8 +10,6 @@ use gpui::{ TextAlign, UniformList, Window, }; use i18n::{shared_t, t}; -use identity::Identity; -use itertools::Itertools; use nostr_sdk::prelude::*; use smallvec::{smallvec, SmallVec}; use theme::ActiveTheme; @@ -189,15 +187,12 @@ impl MessagingRelays { let task: Task> = cx.background_spawn(async move { let client = nostr_client(); - let signer = client.signer().await?; - let public_key = signer.get_public_key().await?; + let tags: Vec = relays + .iter() + .map(|relay| Tag::relay(relay.clone())) + .collect(); - let builder = EventBuilder::new(Kind::InboxRelays, "").tags( - relays - .iter() - .map(|relay| Tag::relay(relay.clone())) - .collect_vec(), - ); + let builder = EventBuilder::new(Kind::InboxRelays, "").tags(tags); // Set messaging relays client.send_event_builder(builder).await?; @@ -208,15 +203,6 @@ impl MessagingRelays { _ = client.connect_relay(&relay).await; } - let id = SubscriptionId::new(GIFT_WRAP_SUB_ID); - let new_messages = Filter::new().kind(Kind::GiftWrap).pubkey(public_key); - - // Close old subscriptions - client.unsubscribe(&id).await; - - // Subscribe to new messages - client.subscribe_with_id(id, new_messages, None).await?; - Ok(()) }); @@ -224,10 +210,6 @@ impl MessagingRelays { match task.await { Ok(_) => { cx.update(|window, cx| { - Identity::global(cx).update(cx, |this, cx| { - this.verify_dm_relays(window, cx); - }); - // Close the current modal window.close_modal(cx); }) .ok(); diff --git a/crates/coop/src/views/sidebar/mod.rs b/crates/coop/src/views/sidebar/mod.rs index c11cef1..4c42d8d 100644 --- a/crates/coop/src/views/sidebar/mod.rs +++ b/crates/coop/src/views/sidebar/mod.rs @@ -580,6 +580,7 @@ impl Sidebar { }); } + #[allow(dead_code)] fn skeletons(&self, total: i32) -> impl IntoIterator { (0..total).map(|_| { div() @@ -788,16 +789,6 @@ impl Render for Sidebar { ), ), ) - .when(registry.loading, |this| { - this.child( - div() - .flex_1() - .flex() - .flex_col() - .gap_1() - .children(self.skeletons(1)), - ) - }) .child( uniform_list( "rooms", diff --git a/crates/global/src/constants.rs b/crates/global/src/constants.rs index 913c02a..ec2103a 100644 --- a/crates/global/src/constants.rs +++ b/crates/global/src/constants.rs @@ -8,10 +8,9 @@ pub const ACCOUNT_D: &str = "coop:account"; pub const SETTINGS_D: &str = "coop:settings"; /// Bootstrap Relays. -pub const BOOTSTRAP_RELAYS: [&str; 5] = [ +pub const BOOTSTRAP_RELAYS: [&str; 4] = [ "wss://relay.damus.io", "wss://relay.primal.net", - "wss://nostr.Wine", "wss://user.kindpag.es", "wss://purplepag.es", ]; @@ -36,14 +35,11 @@ pub const NOSTR_CONNECT_RELAY: &str = "wss://relay.nsec.app"; /// Default timeout (in seconds) for Nostr Connect pub const NOSTR_CONNECT_TIMEOUT: u64 = 200; -/// Unique ID for all gift wraps subscription. -pub const GIFT_WRAP_SUB_ID: &str = "listen_for_giftwraps"; - /// Total metadata requests will be grouped. pub const METADATA_BATCH_LIMIT: usize = 100; -/// Maximum timeout for grouping metadata requests. -pub const METADATA_BATCH_TIMEOUT: u64 = 400; +/// Maximum timeout for grouping metadata requests. (milliseconds) +pub const METADATA_BATCH_TIMEOUT: u64 = 300; /// Maximum timeout for waiting for finish (seconds) pub const WAIT_FOR_FINISH: u64 = 60; diff --git a/crates/global/src/lib.rs b/crates/global/src/lib.rs index ae32124..d7ef10a 100644 --- a/crates/global/src/lib.rs +++ b/crates/global/src/lib.rs @@ -1,11 +1,12 @@ +use std::collections::BTreeSet; use std::sync::OnceLock; use std::time::Duration; use nostr_connect::prelude::*; use nostr_sdk::prelude::*; use paths::nostr_file; +use smol::lock::RwLock; -use crate::constants::GIFT_WRAP_SUB_ID; use crate::paths::support_dir; pub mod constants; @@ -26,15 +27,15 @@ pub enum NostrSignal { /// Partially finished processing all gift wrap events PartialFinish, - /// Receives EOSE response from relay pool - Eose(SubscriptionId), + /// DM relays have been found + DmRelaysFound, /// Notice from Relay Pool Notice(String), } static NOSTR_CLIENT: OnceLock = OnceLock::new(); -static GIFT_WRAP_ID: OnceLock = OnceLock::new(); +static PROCESSED_EVENTS: OnceLock>> = OnceLock::new(); static CURRENT_TIMESTAMP: OnceLock = OnceLock::new(); static FIRST_RUN: OnceLock = OnceLock::new(); @@ -50,22 +51,20 @@ pub fn nostr_client() -> &'static Client { let lmdb = NostrLMDB::open(nostr_file()).expect("Database is NOT initialized"); let opts = ClientOptions::new() - // Coop isn't social client, - // but it needs this option because it needs user's NIP65 Relays to fetch NIP17 Relays. .gossip(true) - // TODO: Coop should handle authentication by itself .automatic_authentication(true) - // Sleep after idle for 5 seconds + .verify_subscriptions(false) + // Sleep after idle for 20 seconds .sleep_when_idle(SleepWhenIdle::Enabled { - timeout: Duration::from_secs(10), + timeout: Duration::from_secs(20), }); ClientBuilder::default().database(lmdb).opts(opts).build() }) } -pub fn gift_wrap_sub_id() -> &'static SubscriptionId { - GIFT_WRAP_ID.get_or_init(|| SubscriptionId::new(GIFT_WRAP_SUB_ID)) +pub fn processed_events() -> &'static RwLock> { + PROCESSED_EVENTS.get_or_init(|| RwLock::new(BTreeSet::new())) } pub fn starting_time() -> &'static Timestamp { diff --git a/crates/identity/src/lib.rs b/crates/identity/src/lib.rs index 57f3f2e..8d973f3 100644 --- a/crates/identity/src/lib.rs +++ b/crates/identity/src/lib.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, Error}; use client_keys::ClientKeys; use common::handle_auth::CoopAuthUrlHandler; use global::constants::{ACCOUNT_D, NIP17_RELAYS, NIP65_RELAYS, NOSTR_CONNECT_TIMEOUT}; -use global::{gift_wrap_sub_id, nostr_client}; +use global::nostr_client; use gpui::prelude::FluentBuilder; use gpui::{ div, red, App, AppContext, Context, Entity, Global, ParentElement, SharedString, Styled, @@ -29,7 +29,7 @@ impl Global for GlobalIdentity {} pub struct Identity { public_key: Option, logging_in: bool, - relay_ready: Option, + has_dm_relays: Option, need_backup: Option, need_onboarding: bool, #[allow(dead_code)] @@ -73,8 +73,8 @@ impl Identity { Self { public_key: None, - relay_ready: None, need_backup: None, + has_dm_relays: None, need_onboarding: false, logging_in: false, subscriptions, @@ -129,6 +129,7 @@ impl Identity { // Unset signer client.unset_signer().await; + // Delete account client.database().delete(filter).await?; @@ -345,7 +346,7 @@ impl Identity { } /// Sets a new signer for the client and updates user identity - pub fn set_signer(&self, signer: S, window: &mut Window, cx: &mut Context) + pub fn set_signer(&mut self, signer: S, window: &mut Window, cx: &mut Context) where S: NostrSigner + 'static, { @@ -357,7 +358,7 @@ impl Identity { client.set_signer(signer).await; // Subscribe for user metadata - Self::subscribe(client, public_key).await?; + get_nip65_relays(public_key).await?; Ok(public_key) }); @@ -422,11 +423,12 @@ impl Identity { // Set user's NIP65 relays client.send_event_builder(relay_list).await?; + // Set user's NIP17 relays client.send_event_builder(dm_relay).await?; - // Subscribe for user metadata - Self::subscribe(client, public_key).await?; + // Get user's NIP65 relays + get_nip65_relays(public_key).await?; Ok(public_key) }); @@ -547,60 +549,15 @@ impl Identity { .detach(); } - pub fn verify_dm_relays(&self, window: &mut Window, cx: &mut Context) { - let Some(public_key) = self.public_key() else { - return; - }; - - let task: Task = cx.background_spawn(async move { - let client = nostr_client(); - let filter = Filter::new() - .kind(Kind::InboxRelays) - .author(public_key) - .limit(1); - - let Ok(events) = client.database().query(filter).await else { - return false; - }; - - let Some(event) = events.first() else { - return false; - }; - - let relays: Vec = event - .tags - .filter(TagKind::Relay) - .filter_map(|tag| RelayUrl::parse(tag.content()?).ok()) - .collect(); - - !relays.is_empty() - }); - - cx.spawn_in(window, async move |this, cx| { - let result = task.await; - - this.update(cx, |this, cx| { - this.relay_ready = Some(result); - cx.notify(); - }) - .ok(); - }) - .detach(); - } - /// Sets the public key of the identity pub(crate) fn set_public_key( &mut self, public_key: Option, - window: &mut Window, + _window: &mut Window, cx: &mut Context, ) { self.public_key = public_key; cx.notify(); - // Run verify user's dm relays task - cx.defer_in(window, |this, window, cx| { - this.verify_dm_relays(window, cx); - }); } /// Returns the current identity's public key @@ -613,8 +570,9 @@ impl Identity { self.public_key.is_some() } - pub fn relay_ready(&self) -> Option { - self.relay_ready + /// Returns true if the identity has DM Relays + pub fn has_dm_relays(&self) -> Option { + self.has_dm_relays } /// Returns true if the identity is currently logging in @@ -622,45 +580,32 @@ impl Identity { self.logging_in } + /// Sets the DM Relays status of the identity + pub fn set_has_dm_relays(&mut self, cx: &mut Context) { + self.has_dm_relays = Some(true); + cx.notify(); + } + /// Sets the logging in status of the identity pub(crate) fn set_logging_in(&mut self, status: bool, cx: &mut Context) { self.logging_in = status; cx.notify(); } - - pub(crate) async fn subscribe(client: &Client, public_key: PublicKey) -> Result<(), Error> { - let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); - - client - .subscribe_with_id( - gift_wrap_sub_id().to_owned(), - Filter::new().kind(Kind::GiftWrap).pubkey(public_key), - None, - ) - .await?; - - client - .subscribe( - Filter::new() - .author(public_key) - .kinds(vec![Kind::Metadata, Kind::ContactList, Kind::RelayList]) - .since(Timestamp::now()), - None, - ) - .await?; - - client - .subscribe( - Filter::new() - .kinds(vec![Kind::Metadata, Kind::ContactList, Kind::RelayList]) - .author(public_key) - .limit(10), - Some(opts), - ) - .await?; - - log::info!("Getting all user's metadata and messages..."); - - Ok(()) - } +} + +async fn get_nip65_relays(public_key: PublicKey) -> Result<(), Error> { + let client = nostr_client(); + let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); + let sub_id = SubscriptionId::new("nip65-relays"); + + let filter = Filter::new() + .kind(Kind::RelayList) + .author(public_key) + .limit(1); + + if client.subscription(&sub_id).await.is_empty() { + client.subscribe_with_id(sub_id, filter, Some(opts)).await?; + } + + Ok(()) }