diff --git a/crates/auto_update/src/lib.rs b/crates/auto_update/src/lib.rs index 533fa35..943b1d5 100644 --- a/crates/auto_update/src/lib.rs +++ b/crates/auto_update/src/lib.rs @@ -126,9 +126,10 @@ impl AutoUpdater { self.set_status(AutoUpdateStatus::Downloading, cx); let task: Task> = cx.background_spawn(async move { + let database = shared_state().client().database(); let ids = event.tags.event_ids().copied(); let filter = Filter::new().ids(ids).kind(Kind::FileMetadata); - let events = shared_state().client.database().query(filter).await?; + let events = database.query(filter).await?; if let Some(event) = events.into_iter().find(|event| event.content == OS) { let tag = event.tags.find(TagKind::Url).context("url not found")?; diff --git a/crates/chats/src/lib.rs b/crates/chats/src/lib.rs index 1553bed..b86a944 100644 --- a/crates/chats/src/lib.rs +++ b/crates/chats/src/lib.rs @@ -1,5 +1,5 @@ -use std::cmp::Reverse; use std::collections::BTreeSet; +use std::{cmp::Reverse, collections::HashMap}; use anyhow::Error; use common::room_hash; @@ -46,13 +46,15 @@ pub enum RoomEmitter { pub struct ChatRegistry { /// Collection of all chat rooms pub rooms: Vec>, + /// Indicates if rooms are currently being loaded /// /// Always equal to `true` when the app starts - pub wait_for_eose: bool, + pub loading: bool, + /// Subscriptions for observing changes #[allow(dead_code)] - subscriptions: SmallVec<[Subscription; 2]>, + subscriptions: SmallVec<[Subscription; 1]>, } impl EventEmitter for ChatRegistry {} @@ -77,13 +79,6 @@ impl ChatRegistry { fn new(cx: &mut Context) -> Self { let mut subscriptions = smallvec![]; - // When the ChatRegistry is created, load all rooms from the local database - subscriptions.push(cx.observe_new::(|this, window, cx| { - if let Some(window) = window { - this.load_rooms(window, cx); - } - })); - // When any Room is created, load metadata for all members subscriptions.push(cx.observe_new::(|this, _window, cx| { this.load_metadata(cx).detach(); @@ -91,7 +86,7 @@ impl ChatRegistry { Self { rooms: vec![], - wait_for_eose: true, + loading: true, subscriptions, } } @@ -104,11 +99,6 @@ impl ChatRegistry { .cloned() } - /// Get room by its position. - pub fn room_by_ix(&self, ix: usize, _cx: &App) -> Option<&Entity> { - self.rooms.get(ix) - } - /// Get all ongoing rooms. pub fn ongoing_rooms(&self, cx: &App) -> Vec> { self.rooms @@ -162,12 +152,13 @@ impl ChatRegistry { /// 3. Determines each room's type based on message frequency and trust status /// 4. Creates Room entities for each unique room pub fn load_rooms(&mut self, window: &mut Window, cx: &mut Context) { - let client = &shared_state().client; - let Some(public_key) = Identity::get_global(cx).profile().map(|i| i.public_key()) else { - return; - }; + log::info!("Starting to load rooms from database..."); let task: Task, Error>> = cx.background_spawn(async move { + let client = shared_state().client(); + let signer = client.signer().await?; + let public_key = signer.get_public_key().await?; + // Get messages sent by the user let send = Filter::new() .kind(Kind::PrivateDirectMessage) @@ -206,7 +197,7 @@ impl ChatRegistry { // Check if room's author is seen in any contact list let filter = Filter::new().kind(Kind::ContactList).pubkey(event.pubkey); // If room's author is seen at least once, mark as trusted - is_trust = client.database().count(filter).await? >= 1; + is_trust = client.database().count(filter).await.unwrap_or(0) >= 1; if is_trust { trusted_keys.insert(event.pubkey); @@ -218,8 +209,9 @@ impl ChatRegistry { .kind(Kind::PrivateDirectMessage) .author(public_key) .pubkeys(public_keys); + // If current user has sent a message at least once, mark as ongoing - let is_ongoing = client.database().count(filter).await? >= 1; + let is_ongoing = client.database().count(filter).await.unwrap_or(1) >= 1; if is_ongoing { rooms.insert(Room::new(&event).kind(RoomKind::Ongoing)); @@ -234,33 +226,45 @@ impl ChatRegistry { }); cx.spawn_in(window, async move |this, cx| { - let rooms = task - .await - .expect("Failed to load chat rooms. Please restart the application."); - - this.update(cx, |this, cx| { - this.wait_for_eose = false; - this.rooms.extend( - rooms - .into_iter() - .sorted_by_key(|room| Reverse(room.created_at)) - .filter_map(|room| { - if !this.rooms.iter().any(|this| this.read(cx).id == room.id) { - Some(cx.new(|_| room)) - } else { - None - } - }) - .collect_vec(), - ); - - cx.notify(); - }) - .ok(); + match task.await { + Ok(rooms) => { + this.update(cx, |this, cx| { + this.extend_rooms(rooms, cx); + this.sort(cx); + }) + .ok(); + } + Err(e) => { + // TODO: push notification + log::error!("Failed to load rooms: {e}") + } + }; }) .detach(); } + pub(crate) fn extend_rooms(&mut self, rooms: BTreeSet, cx: &mut Context) { + let mut room_map: HashMap = HashMap::with_capacity(self.rooms.len()); + + for (index, room) in self.rooms.iter().enumerate() { + room_map.insert(room.read(cx).id, index); + } + + for new_room in rooms.into_iter() { + // Check if we already have a room with this ID + if let Some(&index) = room_map.get(&new_room.id) { + self.rooms[index].update(cx, |this, cx| { + *this = new_room; + cx.notify(); + }); + } else { + let new_index = self.rooms.len(); + room_map.insert(new_room.id, new_index); + self.rooms.push(cx.new(|_| new_room)); + } + } + } + /// Push a new Room to the global registry pub fn push_room(&mut self, room: Entity, cx: &mut Context) { let weak_room = if let Some(room) = self @@ -324,4 +328,9 @@ impl ChatRegistry { cx.notify(); } } + + pub fn set_loading(&mut self, status: bool, cx: &mut Context) { + self.loading = status; + cx.notify(); + } } diff --git a/crates/chats/src/room.rs b/crates/chats/src/room.rs index 24e650e..72b03ee 100644 --- a/crates/chats/src/room.rs +++ b/crates/chats/src/room.rs @@ -324,30 +324,18 @@ impl Room { /// /// # Returns /// - /// A Task that resolves to Result)>, Error> - #[allow(clippy::type_complexity)] + /// A Task that resolves to Result<(), Error> pub fn load_metadata(&self, cx: &mut Context) -> Task> { let public_keys = Arc::clone(&self.members); cx.background_spawn(async move { - for public_key in public_keys.iter() { - let metadata = shared_state() - .client - .database() - .metadata(*public_key) - .await?; + let database = shared_state().client().database(); - shared_state() - .persons - .write() - .await - .entry(*public_key) - .and_modify(|entry| { - if entry.is_none() { - *entry = metadata.clone(); - } - }) - .or_insert_with(|| metadata); + for public_key in public_keys.iter().cloned() { + if !shared_state().has_person(&public_key).await { + let metadata = database.metadata(public_key).await?; + shared_state().insert_person(public_key, metadata).await; + } } Ok(()) @@ -368,6 +356,7 @@ impl Room { let pubkeys = Arc::clone(&self.members); cx.background_spawn(async move { + let database = shared_state().client().database(); let mut result = Vec::with_capacity(pubkeys.len()); for pubkey in pubkeys.iter() { @@ -375,13 +364,7 @@ impl Room { .kind(Kind::InboxRelays) .author(*pubkey) .limit(1); - let is_ready = shared_state() - .client - .database() - .query(filter) - .await? - .first() - .is_some(); + let is_ready = database.query(filter).await?.first().is_some(); result.push((*pubkey, is_ready)); } @@ -410,11 +393,10 @@ impl Room { cx.background_spawn(async move { let mut messages = vec![]; let parser = NostrParser::new(); + let database = shared_state().client().database(); // Get all events from database - let events = shared_state() - .client - .database() + let events = database .query(filter) .await? .into_iter() @@ -637,7 +619,8 @@ impl Room { let backup = AppSettings::get_global(cx).settings.backup_messages; cx.background_spawn(async move { - let signer = shared_state().client.signer().await?; + let client = shared_state().client(); + let signer = client.signer().await?; let public_key = signer.get_public_key().await?; let mut reports = vec![]; @@ -680,13 +663,11 @@ impl Room { }; for receiver in receivers.iter() { - if let Err(e) = shared_state() - .client + if let Err(e) = client .send_private_msg(*receiver, &content, tags.clone()) .await { - let metadata = shared_state() - .client + let metadata = client .database() .metadata(*receiver) .await? @@ -703,13 +684,11 @@ impl Room { // Only send a backup message to current user if there are no issues when sending to others if backup && reports.is_empty() { - if let Err(e) = shared_state() - .client + if let Err(e) = client .send_private_msg(*current_user, &content, tags.clone()) .await { - let metadata = shared_state() - .client + let metadata = client .database() .metadata(*current_user) .await? diff --git a/crates/client_keys/src/lib.rs b/crates/client_keys/src/lib.rs index 8bee67a..fcc790b 100644 --- a/crates/client_keys/src/lib.rs +++ b/crates/client_keys/src/lib.rs @@ -63,7 +63,7 @@ impl ClientKeys { this.set_keys(Some(keys), false, cx); }) .ok(); - } else if shared_state().first_run { + } else if shared_state().first_run() { // Generate a new keys and update this.update(cx, |this, cx| { this.new_keys(cx); diff --git a/crates/coop/src/chatspace.rs b/crates/coop/src/chatspace.rs index fa81958..1ac9c1f 100644 --- a/crates/coop/src/chatspace.rs +++ b/crates/coop/src/chatspace.rs @@ -151,6 +151,11 @@ impl ChatSpace { if !state.read(cx).has_profile() { this.open_onboarding(window, cx); } else { + // Load all chat rooms from database + ChatRegistry::global(cx).update(cx, |this, cx| { + this.load_rooms(window, cx); + }); + // Open chat panels this.open_chats(window, cx); } }, @@ -273,19 +278,14 @@ impl ChatSpace { fn verify_messaging_relays(&self, cx: &App) -> Task> { cx.background_spawn(async move { - let signer = shared_state().client.signer().await?; + let client = shared_state().client(); + let signer = client.signer().await?; let public_key = signer.get_public_key().await?; let filter = Filter::new() .kind(Kind::InboxRelays) .author(public_key) .limit(1); - let is_exist = shared_state() - .client - .database() - .query(filter) - .await? - .first() - .is_some(); + let is_exist = client.database().query(filter).await?.first().is_some(); Ok(is_exist) }) diff --git a/crates/coop/src/main.rs b/crates/coop/src/main.rs index 05ffda8..404db26 100644 --- a/crates/coop/src/main.rs +++ b/crates/coop/src/main.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use asset::Assets; use auto_update::AutoUpdater; use chats::ChatRegistry; -use global::constants::APP_ID; #[cfg(not(target_os = "linux"))] use global::constants::APP_NAME; +use global::constants::{ALL_MESSAGES_SUB_ID, APP_ID}; use global::{shared_state, NostrSignal}; use gpui::{ actions, px, size, App, AppContext, Application, Bounds, KeyBinding, Menu, MenuItem, @@ -15,6 +15,7 @@ use gpui::{ use gpui::{point, SharedString, TitlebarOptions}; #[cfg(target_os = "linux")] use gpui::{WindowBackgroundAppearance, WindowDecorations}; +use nostr_sdk::SubscriptionId; use theme::Theme; use ui::Root; @@ -28,17 +29,19 @@ fn main() { // Initialize logging tracing_subscriber::fmt::init(); - // Initialize the Global State and process events in a separate thread. - // Must be run under async utility runtime - nostr_sdk::async_utility::task::spawn(async move { - shared_state().start().await; - }); - // Initialize the Application let app = Application::new() .with_assets(Assets) .with_http_client(Arc::new(reqwest_client::ReqwestClient::new())); + // Initialize the Global State and process events in a separate thread. + app.background_executor() + .spawn(async move { + shared_state().start().await; + }) + .detach(); + + // Run application app.run(move |cx| { // Register the `quit` function cx.on_action(quit); @@ -100,42 +103,44 @@ fn main() { // Initialize chat state chats::init(cx); - // Initialize chatspace (or workspace) - let chatspace = chatspace::init(window, cx); - let async_chatspace = chatspace.downgrade(); - // Spawn a task to handle events from nostr channel cx.spawn_in(window, async move |_, cx| { - while let Ok(signal) = shared_state().global_receiver.recv().await { + let all_messages_sub_id = SubscriptionId::new(ALL_MESSAGES_SUB_ID); + + while let Ok(signal) = shared_state().signal().recv().await { cx.update(|window, cx| { let chats = ChatRegistry::global(cx); let auto_updater = AutoUpdater::global(cx); match signal { - NostrSignal::SignerUpdated => { - async_chatspace - .update(cx, |this, cx| { - this.open_chats(window, cx); - }) - .ok(); - } - NostrSignal::SignerUnset => { - async_chatspace - .update(cx, |this, cx| { - this.open_onboarding(window, cx); - }) - .ok(); - } - NostrSignal::Eose => { - chats.update(cx, |this, cx| { - this.load_rooms(window, cx); - }); - } NostrSignal::Event(event) => { chats.update(cx, |this, cx| { this.event_to_message(event, window, cx); }); } + // Load chat rooms and stop the loading status + NostrSignal::Finish => { + chats.update(cx, |this, cx| { + this.load_rooms(window, cx); + this.set_loading(false, cx); + }); + } + // Load chat rooms without setting as finished + NostrSignal::PartialFinish => { + chats.update(cx, |this, cx| { + this.load_rooms(window, cx); + }); + } + NostrSignal::Eose(subscription_id) => { + if subscription_id == all_messages_sub_id { + chats.update(cx, |this, cx| { + this.load_rooms(window, cx); + }); + } + } + NostrSignal::Notice(_msg) => { + // window.push_notification(msg, cx); + } NostrSignal::AppUpdate(event) => { auto_updater.update(cx, |this, cx| { this.update(event, cx); @@ -148,7 +153,7 @@ fn main() { }) .detach(); - Root::new(chatspace.into(), window, cx) + Root::new(chatspace::init(window, cx).into(), window, cx) }) }) .expect("Failed to open window. Please restart the application."); diff --git a/crates/coop/src/views/chat.rs b/crates/coop/src/views/chat.rs index a6f7b61..36041c2 100644 --- a/crates/coop/src/views/chat.rs +++ b/crates/coop/src/views/chat.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use std::rc::Rc; use std::sync::Arc; -use async_utility::task::spawn; use chats::message::Message; use chats::room::{Room, RoomKind, SendError}; use common::nip96_upload; @@ -391,8 +390,8 @@ impl Chat { let (tx, rx) = oneshot::channel::>(); // Spawn task via async utility instead of GPUI context - spawn(async move { - let url = match nip96_upload(&shared_state().client, nip96, file_data) + nostr_sdk::async_utility::task::spawn(async move { + let url = match nip96_upload(shared_state().client(), nip96, file_data) .await { Ok(url) => Some(url), diff --git a/crates/coop/src/views/compose.rs b/crates/coop/src/views/compose.rs index 4da58e9..aab2717 100644 --- a/crates/coop/src/views/compose.rs +++ b/crates/coop/src/views/compose.rs @@ -69,13 +69,10 @@ impl Compose { cx.spawn(async move |this, cx| { let task: Task, Error>> = cx.background_spawn(async move { - let signer = shared_state().client.signer().await?; + let client = shared_state().client(); + let signer = client.signer().await?; let public_key = signer.get_public_key().await?; - let profiles = shared_state() - .client - .database() - .contacts(public_key) - .await?; + let profiles = client.database().contacts(public_key).await?; Ok(profiles) }); @@ -134,7 +131,7 @@ impl Compose { let tags = Tags::from_list(tag_list); let event: Task> = cx.background_spawn(async move { - let signer = shared_state().client.signer().await?; + let signer = shared_state().client().signer().await?; let public_key = signer.get_public_key().await?; // [IMPORTANT] @@ -184,7 +181,7 @@ impl Compose { let public_key = profile.public_key; let metadata = shared_state() - .client + .client() .fetch_metadata(public_key, Duration::from_secs(2)) .await? .unwrap_or_default(); @@ -200,7 +197,7 @@ impl Compose { cx.background_spawn(async move { let metadata = shared_state() - .client + .client() .fetch_metadata(public_key, Duration::from_secs(2)) .await? .unwrap_or_default(); diff --git a/crates/coop/src/views/login.rs b/crates/coop/src/views/login.rs index 42b6646..cadfb90 100644 --- a/crates/coop/src/views/login.rs +++ b/crates/coop/src/views/login.rs @@ -158,11 +158,9 @@ impl Login { subscriptions.push( cx.observe_in(&active_signer, window, |this, entity, window, cx| { - if let Some(mut signer) = entity.read(cx).clone() { - // Automatically open auth url - signer.auth_url_handler(CoopAuthUrlHandler); + if let Some(signer) = entity.read(cx).as_ref() { // Wait for connection from remote signer - this.wait_for_connection(signer, window, cx); + this.wait_for_connection(signer.to_owned(), window, cx); } }), ); @@ -284,11 +282,7 @@ impl Login { }; if let Some(secret_key) = secret_key { - // Active signer is no longer needed - self.shutdown_active_signer(cx); - let keys = Keys::new(secret_key); - Identity::global(cx).update(cx, |this, cx| { this.write_keys(&keys, password, cx); this.set_signer(keys, window, cx); @@ -312,9 +306,6 @@ impl Login { return; }; - // Active signer is no longer needed - self.shutdown_active_signer(cx); - // Automatically open auth url signer.auth_url_handler(CoopAuthUrlHandler); @@ -359,10 +350,14 @@ impl Login { let (tx, rx) = oneshot::channel::>(); cx.background_spawn(async move { - if let Ok(bunker_uri) = signer.bunker_uri().await { - tx.send(Some((bunker_uri, signer))).ok(); - } else { - tx.send(None).ok(); + match signer.bunker_uri().await { + Ok(bunker_uri) => { + tx.send(Some((bunker_uri, signer))).ok(); + } + Err(e) => { + log::error!("Nostr Connect (Client): {e}"); + tx.send(None).ok(); + } } }) .detach(); @@ -378,9 +373,9 @@ impl Login { .ok(); } else { cx.update(|window, cx| { - window.push_notification(Notification::error("Connection failed"), cx); // Refresh the active signer this.update(cx, |this, cx| { + window.push_notification(Notification::error("Connection failed"), cx); this.change_relay(window, cx); }) .ok(); @@ -407,15 +402,6 @@ impl Login { }); } - fn shutdown_active_signer(&self, cx: &Context) { - if let Some(signer) = self.active_signer.read(cx).clone() { - cx.background_spawn(async move { - signer.shutdown().await; - }) - .detach(); - } - } - fn set_error(&mut self, message: impl Into, cx: &mut Context) { self.set_logging_in(false, cx); self.error.update(cx, |this, cx| { diff --git a/crates/coop/src/views/new_account.rs b/crates/coop/src/views/new_account.rs index fec6d51..da8f7f6 100644 --- a/crates/coop/src/views/new_account.rs +++ b/crates/coop/src/views/new_account.rs @@ -1,4 +1,3 @@ -use async_utility::task::spawn; use common::nip96_upload; use global::shared_state; use gpui::prelude::FluentBuilder; @@ -157,9 +156,9 @@ impl NewAccount { if let Ok(file_data) = fs::read(path).await { let (tx, rx) = oneshot::channel::(); - spawn(async move { + nostr_sdk::async_utility::task::spawn(async move { if let Ok(url) = - nip96_upload(&shared_state().client, nip96, file_data).await + nip96_upload(shared_state().client(), nip96, file_data).await { _ = tx.send(url); } diff --git a/crates/coop/src/views/onboarding.rs b/crates/coop/src/views/onboarding.rs index 78d882d..4624f65 100644 --- a/crates/coop/src/views/onboarding.rs +++ b/crates/coop/src/views/onboarding.rs @@ -45,18 +45,14 @@ impl Onboarding { let local_account = cx.new(|_| None); let task = cx.background_spawn(async move { + let database = shared_state().client().database(); + let filter = Filter::new() .kind(Kind::ApplicationSpecificData) .identifier(ACCOUNT_D) .limit(1); - if let Some(event) = shared_state() - .client - .database() - .query(filter) - .await? - .first_owned() - { + if let Some(event) = database.query(filter).await?.first_owned() { let public_key = event .tags .public_keys() @@ -65,14 +61,7 @@ impl Onboarding { .first() .cloned() .unwrap(); - - let metadata = shared_state() - .client - .database() - .metadata(public_key) - .await? - .unwrap_or_default(); - + let metadata = database.metadata(public_key).await?.unwrap_or_default(); let profile = Profile::new(public_key, metadata); Ok(profile) diff --git a/crates/coop/src/views/profile.rs b/crates/coop/src/views/profile.rs index 05f6e5b..babaa24 100644 --- a/crates/coop/src/views/profile.rs +++ b/crates/coop/src/views/profile.rs @@ -1,7 +1,6 @@ use std::str::FromStr; use std::time::Duration; -use async_utility::task::spawn; use common::nip96_upload; use global::shared_state; use gpui::prelude::FluentBuilder; @@ -56,10 +55,10 @@ impl Profile { }; let task: Task, Error>> = cx.background_spawn(async move { - let signer = shared_state().client.signer().await?; + let client = shared_state().client(); + let signer = client.signer().await?; let public_key = signer.get_public_key().await?; - let metadata = shared_state() - .client + let metadata = client .fetch_metadata(public_key, Duration::from_secs(2)) .await?; @@ -124,9 +123,9 @@ impl Profile { if let Ok(file_data) = fs::read(path).await { let (tx, rx) = oneshot::channel::(); - spawn(async move { + nostr_sdk::async_utility::task::spawn(async move { if let Ok(url) = - nip96_upload(&shared_state().client, nip96, file_data).await + nip96_upload(shared_state().client(), nip96, file_data).await { _ = tx.send(url); } @@ -193,7 +192,7 @@ impl Profile { } let task: Task> = cx.background_spawn(async move { - let _ = shared_state().client.set_metadata(&new_metadata).await?; + let _ = shared_state().client().set_metadata(&new_metadata).await?; Ok(()) }); diff --git a/crates/coop/src/views/relays.rs b/crates/coop/src/views/relays.rs index 1c46a41..6f963fd 100644 --- a/crates/coop/src/views/relays.rs +++ b/crates/coop/src/views/relays.rs @@ -35,20 +35,15 @@ impl Relays { let input = cx.new(|cx| InputState::new(window, cx).placeholder("wss://example.com")); let relays = cx.new(|cx| { let task: Task, Error>> = cx.background_spawn(async move { - let signer = shared_state().client.signer().await?; + let client = shared_state().client(); + let signer = client.signer().await?; let public_key = signer.get_public_key().await?; let filter = Filter::new() .kind(Kind::InboxRelays) .author(public_key) .limit(1); - if let Some(event) = shared_state() - .client - .database() - .query(filter) - .await? - .first_owned() - { + if let Some(event) = client.database().query(filter).await?.first_owned() { let relays = event .tags .filter(TagKind::Relay) @@ -111,23 +106,18 @@ impl Relays { let relays = self.relays.read(cx).clone(); let task: Task> = cx.background_spawn(async move { - let signer = shared_state().client.signer().await?; + let client = shared_state().client(); + let signer = client.signer().await?; let public_key = signer.get_public_key().await?; // If user didn't have any NIP-65 relays, add default ones - if shared_state() - .client - .database() - .relay_list(public_key) - .await? - .is_empty() - { + if client.database().relay_list(public_key).await?.is_empty() { let builder = EventBuilder::relay_list(vec![ (RelayUrl::parse("wss://relay.damus.io/").unwrap(), None), (RelayUrl::parse("wss://relay.primal.net/").unwrap(), None), ]); - if let Err(e) = shared_state().client.send_event_builder(builder).await { + if let Err(e) = client.send_event_builder(builder).await { log::error!("Failed to send relay list event: {}", e); } } @@ -138,22 +128,21 @@ impl Relays { .collect(); let builder = EventBuilder::new(Kind::InboxRelays, "").tags(tags); - let output = shared_state().client.send_event_builder(builder).await?; + let output = client.send_event_builder(builder).await?; // Connect to messaging relays for relay in relays.into_iter() { - _ = shared_state().client.add_relay(&relay).await; - _ = shared_state().client.connect_relay(&relay).await; + _ = client.add_relay(&relay).await; + _ = client.connect_relay(&relay).await; } let sub_id = SubscriptionId::new(NEW_MESSAGE_SUB_ID); // Close old subscription - shared_state().client.unsubscribe(&sub_id).await; + client.unsubscribe(&sub_id).await; // Subscribe to new messages - if let Err(e) = shared_state() - .client + if let Err(e) = client .subscribe_with_id( sub_id, Filter::new() diff --git a/crates/coop/src/views/sidebar/mod.rs b/crates/coop/src/views/sidebar/mod.rs index 07ce870..4169886 100644 --- a/crates/coop/src/views/sidebar/mod.rs +++ b/crates/coop/src/views/sidebar/mod.rs @@ -2,7 +2,6 @@ use std::collections::BTreeSet; use std::ops::Range; use std::time::Duration; -use async_utility::task::spawn; use chats::room::{Room, RoomKind}; use chats::{ChatRegistry, RoomEmitter}; use common::debounced_delay::DebouncedDelay; @@ -12,10 +11,10 @@ use global::constants::{DEFAULT_MODAL_WIDTH, SEARCH_RELAYS}; use global::shared_state; use gpui::prelude::FluentBuilder; use gpui::{ - div, px, rems, uniform_list, AnyElement, App, AppContext, ClipboardItem, Context, Entity, - EventEmitter, FocusHandle, Focusable, InteractiveElement, IntoElement, ParentElement, Render, - RetainAllImageCache, SharedString, StatefulInteractiveElement, Styled, Subscription, Task, - Window, + div, px, relative, rems, uniform_list, AnyElement, App, AppContext, ClipboardItem, Context, + Entity, EventEmitter, FocusHandle, Focusable, InteractiveElement, IntoElement, ParentElement, + Render, RetainAllImageCache, SharedString, StatefulInteractiveElement, Styled, Subscription, + Task, Window, }; use identity::Identity; use itertools::Itertools; @@ -26,6 +25,7 @@ use theme::ActiveTheme; use ui::avatar::Avatar; use ui::button::{Button, ButtonRounded, ButtonVariants}; use ui::dock_area::panel::{Panel, PanelEvent}; +use ui::indicator::Indicator; use ui::input::{InputEvent, InputState, TextInput}; use ui::popup_menu::PopupMenu; use ui::skeleton::Skeleton; @@ -145,13 +145,14 @@ impl Sidebar { let query = self.find_input.read(cx).value().clone(); cx.background_spawn(async move { + let client = shared_state().client(); + let filter = Filter::new() .kind(Kind::Metadata) .search(query.to_lowercase()) .limit(FIND_LIMIT); - let events = shared_state() - .client + let events = client .fetch_events_from(SEARCH_RELAYS, filter, Duration::from_secs(3)) .await? .into_iter() @@ -161,12 +162,8 @@ impl Sidebar { let mut rooms = BTreeSet::new(); let (tx, rx) = smol::channel::bounded::(10); - spawn(async move { - let signer = shared_state() - .client - .signer() - .await - .expect("signer is required"); + nostr_sdk::async_utility::task::spawn(async move { + let signer = client.signer().await.expect("signer is required"); let public_key = signer.get_public_key().await.expect("error"); for event in events.into_iter() { @@ -349,7 +346,46 @@ impl Sidebar { }); } - fn render_account(&self, profile: &Profile, cx: &Context) -> impl IntoElement { + fn open_loading_modal(&self, window: &mut Window, cx: &mut Context) { + window.open_modal(cx, move |this, _window, cx| { + const BODY_1: &str = + "Coop is downloading all your messages from the messaging relays. \ + Depending on your total number of messages, this process may take up to \ + 15 minutes if you're using Nostr Connect."; + const BODY_2: &str = + "Please be patient - you only need to do this full download once. \ + Next time, Coop will only download new messages."; + const DESCRIPTION: &str = "You still can use the app normally \ + while messages are processing in the background"; + + this.child( + div() + .pt_8() + .pb_4() + .px_4() + .flex() + .flex_col() + .gap_2() + .child( + div() + .flex() + .flex_col() + .gap_2() + .text_sm() + .child(BODY_1) + .child(BODY_2), + ) + .child( + div() + .text_xs() + .text_color(cx.theme().text_muted) + .child(DESCRIPTION), + ), + ) + }); + } + + fn account(&self, profile: &Profile, cx: &Context) -> impl IntoElement { let proxy = AppSettings::get_global(cx).settings.proxy_user_avatars; div() @@ -396,7 +432,7 @@ impl Sidebar { ) } - fn render_skeleton(&self, total: i32) -> impl IntoIterator { + fn skeletons(&self, total: i32) -> impl IntoIterator { (0..total).map(|_| { div() .h_9() @@ -406,7 +442,14 @@ impl Sidebar { .items_center() .gap_2() .child(Skeleton::new().flex_shrink_0().size_6().rounded_full()) - .child(Skeleton::new().w_40().h_4().rounded_sm()) + .child( + div() + .flex_1() + .flex() + .justify_between() + .child(Skeleton::new().w_32().h_2p5().rounded_sm()) + .child(Skeleton::new().w_6().h_2p5().rounded_sm()), + ) }) } @@ -473,7 +516,7 @@ impl Focusable for Sidebar { impl Render for Sidebar { fn render(&mut self, _window: &mut Window, cx: &mut Context) -> impl IntoElement { let chats = ChatRegistry::get_global(cx); - + // Get rooms from either search results or the chat registry let rooms = if let Some(results) = self.local_result.read(cx) { results.to_owned() } else { @@ -488,12 +531,13 @@ impl Render for Sidebar { div() .image_cache(self.image_cache.clone()) .size_full() + .relative() .flex() .flex_col() .gap_3() // Account .when_some(Identity::get_global(cx).profile(), |this, profile| { - this.child(self.render_account(&profile, cx)) + this.child(self.account(&profile, cx)) }) // Search Input .child( @@ -528,6 +572,7 @@ impl Render for Sidebar { items })) }) + // Chat Rooms .child( div() .px_2() @@ -623,13 +668,14 @@ impl Render for Sidebar { ) }), ) - .when(chats.wait_for_eose, |this| { + .when(chats.loading, |this| { this.child( div() + .flex_1() .flex() .flex_col() .gap_1() - .children(self.render_skeleton(10)), + .children(self.skeletons(1)), ) }) .child( @@ -643,5 +689,61 @@ impl Render for Sidebar { .h_full(), ), ) + .when(chats.loading, |this| { + this.child( + div().absolute().bottom_4().px_4().child( + div() + .p_1() + .w_full() + .rounded_full() + .flex() + .items_center() + .justify_between() + .bg(cx.theme().panel_background) + .shadow_sm() + // Empty div + .child(div().size_6().flex_shrink_0()) + // Loading indicator + .child( + div() + .flex_1() + .flex() + .flex_col() + .items_center() + .justify_center() + .text_xs() + .text_center() + .child( + div() + .font_semibold() + .flex() + .items_center() + .gap_1() + .line_height(relative(1.2)) + .child(Indicator::new().xsmall()) + .child("Retrieving messages..."), + ) + .child( + div() + .text_color(cx.theme().text_muted) + .child("This may take some time"), + ), + ) + // Info button + .child( + Button::new("help") + .icon(IconName::Info) + .tooltip("Why you're seeing this") + .small() + .ghost() + .rounded(ButtonRounded::Full) + .flex_shrink_0() + .on_click(cx.listener(move |this, _, window, cx| { + this.open_loading_modal(window, cx) + })), + ), + ), + ) + }) } } diff --git a/crates/global/src/constants.rs b/crates/global/src/constants.rs index d1171c6..0412d5c 100644 --- a/crates/global/src/constants.rs +++ b/crates/global/src/constants.rs @@ -31,8 +31,8 @@ pub const SEARCH_RELAYS: [&str; 1] = ["wss://relay.nostr.band"]; /// Default relay for Nostr Connect pub const NOSTR_CONNECT_RELAY: &str = "wss://relay.nsec.app"; -/// Default timeout for Nostr Connect -pub const NOSTR_CONNECT_TIMEOUT: u64 = 300; +/// Default timeout (in seconds) for Nostr Connect +pub const NOSTR_CONNECT_TIMEOUT: u64 = 200; /// Unique ID for new message subscription. pub const NEW_MESSAGE_SUB_ID: &str = "listen_new_giftwraps"; @@ -40,9 +40,9 @@ pub const NEW_MESSAGE_SUB_ID: &str = "listen_new_giftwraps"; pub const ALL_MESSAGES_SUB_ID: &str = "listen_all_giftwraps"; /// Total metadata requests will be grouped. -pub const METADATA_BATCH_LIMIT: usize = 200; +pub const METADATA_BATCH_LIMIT: usize = 100; /// Maximum timeout for grouping metadata requests. -pub const METADATA_BATCH_TIMEOUT: u64 = 300; +pub const METADATA_BATCH_TIMEOUT: u64 = 400; /// Default width for all modals. pub const DEFAULT_MODAL_WIDTH: f32 = 420.; @@ -56,4 +56,4 @@ pub const IMAGE_RESIZE_SERVICE: &str = "https://wsrv.nl"; pub const NIP96_SERVER: &str = "https://nostrmedia.com"; pub(crate) const GLOBAL_CHANNEL_LIMIT: usize = 2048; -pub(crate) const BATCH_CHANNEL_LIMIT: usize = 1024; +pub(crate) const BATCH_CHANNEL_LIMIT: usize = 2048; diff --git a/crates/global/src/lib.rs b/crates/global/src/lib.rs index 7fd8a4f..3886a9f 100644 --- a/crates/global/src/lib.rs +++ b/crates/global/src/lib.rs @@ -8,9 +8,11 @@ use constants::{ ALL_MESSAGES_SUB_ID, APP_ID, APP_PUBKEY, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT, METADATA_BATCH_TIMEOUT, NEW_MESSAGE_SUB_ID, SEARCH_RELAYS, }; +use nostr_connect::prelude::*; use nostr_sdk::prelude::*; use paths::nostr_file; use smol::lock::RwLock; +use smol::Task; use crate::constants::{BATCH_CHANNEL_LIMIT, GLOBAL_CHANNEL_LIMIT}; use crate::paths::support_dir; @@ -24,36 +26,42 @@ static GLOBALS: OnceLock = OnceLock::new(); /// Signals sent through the global event channel to notify UI components #[derive(Debug)] pub enum NostrSignal { - /// User's signing keys have been updated - SignerUpdated, - /// User's signing keys have been unset - SignerUnset, - /// New Nostr event received + /// New gift wrap event received Event(Event), + /// Finished processing all gift wrap events + Finish, + /// Partially finished processing all gift wrap events + PartialFinish, + /// Receives EOSE response from relay pool + Eose(SubscriptionId), + /// Notice from Relay Pool + Notice(String), /// Application update event received AppUpdate(Event), - /// End of stored events received from relay - Eose, } /// Global application state containing Nostr client and shared resources pub struct Globals { /// The Nostr SDK client - pub client: Client, + client: Client, + /// Determines if this is the first time user run Coop - pub first_run: bool, - /// Auto-close options for subscriptions - pub auto_close: Option, - /// Channel sender for broadcasting global Nostr events to UI - pub global_sender: smol::channel::Sender, - /// Channel receiver for handling global Nostr events - pub global_receiver: smol::channel::Receiver, - /// Channel sender for batching public keys for metadata fetching - pub batch_sender: smol::channel::Sender>, - /// Channel receiver for processing batched public key requests - pub batch_receiver: smol::channel::Receiver>, + first_run: bool, + /// Cache of user profiles mapped by their public keys - pub persons: RwLock>>, + persons: RwLock>>, + + /// Channel sender for broadcasting global Nostr events to UI + global_sender: smol::channel::Sender, + + /// Channel receiver for handling global Nostr events + global_receiver: smol::channel::Receiver, + + batch_sender: smol::channel::Sender, + batch_receiver: smol::channel::Receiver, + + event_sender: smol::channel::Sender, + event_receiver: smol::channel::Receiver, } /// Returns the global singleton instance, initializing it if necessary @@ -74,19 +82,20 @@ pub fn shared_state() -> &'static Globals { smol::channel::bounded::(GLOBAL_CHANNEL_LIMIT); let (batch_sender, batch_receiver) = - smol::channel::bounded::>(BATCH_CHANNEL_LIMIT); + smol::channel::bounded::(BATCH_CHANNEL_LIMIT); + + let (event_sender, event_receiver) = smol::channel::unbounded::(); Globals { client: ClientBuilder::default().database(lmdb).opts(opts).build(), persons: RwLock::new(BTreeMap::new()), - auto_close: Some( - SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE), - ), first_run, global_sender, global_receiver, batch_sender, batch_receiver, + event_sender, + event_receiver, } }) } @@ -95,72 +104,13 @@ impl Globals { /// Starts the global event processing system and metadata batching pub async fn start(&self) { self.connect().await; - self.subscribe_for_app_updates().await; self.preload_metadata().await; - - nostr_sdk::async_utility::task::spawn(async move { - let mut batch: BTreeSet = BTreeSet::new(); - let timeout_duration = Duration::from_millis(METADATA_BATCH_TIMEOUT); - - loop { - let timeout = smol::Timer::after(timeout_duration); - - /// Internal events for the metadata batching system - enum BatchEvent { - /// New public keys to add to the batch - NewKeys(Vec), - /// Timeout reached, process current batch - Timeout, - /// Channel was closed, shutdown gracefully - ChannelClosed, - } - - let event = smol::future::or( - async { - match shared_state().batch_receiver.recv().await { - Ok(public_keys) => BatchEvent::NewKeys(public_keys), - Err(_) => BatchEvent::ChannelClosed, - } - }, - async { - timeout.await; - BatchEvent::Timeout - }, - ) - .await; - - match event { - BatchEvent::NewKeys(public_keys) => { - batch.extend(public_keys); - - // Process immediately if batch limit reached - if batch.len() >= METADATA_BATCH_LIMIT { - shared_state() - .sync_data_for_pubkeys(mem::take(&mut batch)) - .await; - } - } - BatchEvent::Timeout => { - // Process current batch if not empty - if !batch.is_empty() { - shared_state() - .sync_data_for_pubkeys(mem::take(&mut batch)) - .await; - } - } - BatchEvent::ChannelClosed => { - // Process remaining batch and exit - if !batch.is_empty() { - shared_state().sync_data_for_pubkeys(batch).await; - } - break; - } - } - } - }); + self.subscribe_for_app_updates().await; + self.batching_metadata().detach(); // .detach() to keep running in background let mut notifications = self.client.notifications(); let mut processed_events: BTreeSet = BTreeSet::new(); + let new_messages_sub_id = SubscriptionId::new(NEW_MESSAGE_SUB_ID); while let Ok(notification) = notifications.recv().await { if let RelayPoolNotification::Message { message, .. } = notification { @@ -177,10 +127,18 @@ impl Globals { match event.kind { Kind::GiftWrap => { - self.unwrap_event(&subscription_id, &event).await; + if *subscription_id == new_messages_sub_id + || self + .event_sender + .send(event.clone().into_owned()) + .await + .is_err() + { + self.unwrap_event(&event, true).await; + } } Kind::Metadata => { - self.insert_person(&event).await; + self.insert_person_from_event(&event).await; } Kind::ContactList => { self.extract_pubkeys_and_sync(&event).await; @@ -192,9 +150,8 @@ impl Globals { } } RelayMessage::EndOfStoredEvents(subscription_id) => { - if *subscription_id == SubscriptionId::new(ALL_MESSAGES_SUB_ID) { - self.global_sender.send(NostrSignal::Eose).await.ok(); - } + self.send_signal(NostrSignal::Eose(subscription_id.into_owned())) + .await; } _ => {} } @@ -202,19 +159,147 @@ impl Globals { } } - pub async fn unset_signer(&self) { - self.client.reset().await; + /// Gets a reference to the Nostr Client instance + pub fn client(&'static self) -> &'static Client { + &self.client + } - if let Ok(signer) = self.client.signer().await { - if let Ok(public_key) = signer.get_public_key().await { - let file = support_dir().join(format!(".{}", public_key.to_bech32().unwrap())); - fs::remove_file(&file).ok(); + /// Returns whether this is the first time the application has been run + pub fn first_run(&self) -> bool { + self.first_run + } + + /// Gets the global signal receiver + pub fn signal(&self) -> smol::channel::Receiver { + self.global_receiver.clone() + } + + /// Sends a signal through the global channel to notify GPUI + /// + /// # Arguments + /// * `signal` - The [`NostrSignal`] to send to GPUI + /// + /// # Examples + /// ``` + /// shared_state().send_signal(NostrSignal::Finish).await; + /// ``` + pub async fn send_signal(&self, signal: NostrSignal) { + if let Err(e) = self.global_sender.send(signal).await { + log::error!("Failed to send signal: {e}") + } + } + + /// Batch metadata requests. Combine all requests from multiple authors into single filter + pub(crate) fn batching_metadata(&self) -> Task<()> { + smol::spawn(async move { + let duration = Duration::from_millis(METADATA_BATCH_TIMEOUT); + let mut batch: BTreeSet = BTreeSet::new(); + + loop { + let timeout = smol::Timer::after(duration); + /// Internal events for the metadata batching system + enum BatchEvent { + NewKeys(PublicKey), + Timeout, + Closed, + } + + let event = smol::future::or( + async { + if let Ok(public_key) = shared_state().batch_receiver.recv().await { + BatchEvent::NewKeys(public_key) + } else { + BatchEvent::Closed + } + }, + async { + timeout.await; + BatchEvent::Timeout + }, + ) + .await; + + match event { + BatchEvent::NewKeys(public_key) => { + batch.insert(public_key); + // Process immediately if batch limit reached + if batch.len() >= METADATA_BATCH_LIMIT { + shared_state() + .sync_data_for_pubkeys(mem::take(&mut batch)) + .await; + } + } + BatchEvent::Timeout => { + if !batch.is_empty() { + shared_state() + .sync_data_for_pubkeys(mem::take(&mut batch)) + .await; + } + } + BatchEvent::Closed => { + if !batch.is_empty() { + shared_state() + .sync_data_for_pubkeys(mem::take(&mut batch)) + .await; + } + break; + } + } } - } + }) + } - if let Err(e) = self.global_sender.send(NostrSignal::SignerUnset).await { - log::error!("Failed to send signal to global channel: {}", e); - } + /// Process to unwrap the gift wrapped events + pub(crate) fn process_gift_wrap_events(&self) -> Task<()> { + smol::spawn(async move { + let timeout_duration = Duration::from_secs(75); // 75 secs + let mut counter = 0; + + loop { + // Signer is unset, probably user is not ready to retrieve gift wrap events + if shared_state().client.signer().await.is_err() { + continue; + } + + let timeout = smol::Timer::after(timeout_duration); + + // TODO: Find a way to make this code prettier + let event = smol::future::or( + async { (shared_state().event_receiver.recv().await).ok() }, + async { + timeout.await; + None + }, + ) + .await; + + match event { + Some(event) => { + // Process the gift wrap event unwrapping + let is_cached = shared_state().unwrap_event(&event, false).await; + + // Increment the total messages counter if message is not from cache + if !is_cached { + counter += 1; + } + + // Send partial finish signal to GPUI + if counter >= 20 { + shared_state().send_signal(NostrSignal::PartialFinish).await; + // Reset counter + counter = 0; + } + } + None => { + shared_state().send_signal(NostrSignal::Finish).await; + break; + } + } + } + + // Event channel is no longer needed when all gift wrap events have been processed + shared_state().event_receiver.close(); + }) } /// Gets a person's profile from cache or creates default (blocking) @@ -239,6 +324,41 @@ impl Globals { Profile::new(*public_key, metadata) } + /// Check if a person exists or not + pub async fn has_person(&self, public_key: &PublicKey) -> bool { + self.persons.read().await.contains_key(public_key) + } + + /// Inserts or updates a person's metadata + pub async fn insert_person(&self, public_key: PublicKey, metadata: Option) { + self.persons + .write() + .await + .entry(public_key) + .and_modify(|entry| { + if entry.is_none() { + *entry = metadata.clone(); + } + }) + .or_insert_with(|| metadata); + } + + /// Inserts or updates a person's metadata from a Kind::Metadata event + pub(crate) async fn insert_person_from_event(&self, event: &Event) { + let metadata = Metadata::from_json(&event.content).ok(); + + self.persons + .write() + .await + .entry(event.pubkey) + .and_modify(|entry| { + if entry.is_none() { + *entry = metadata.clone(); + } + }) + .or_insert_with(|| metadata); + } + /// Connects to bootstrap and configured relays pub(crate) async fn connect(&self) { for relay in BOOTSTRAP_RELAYS.into_iter() { @@ -261,67 +381,79 @@ impl Globals { /// Subscribes to user-specific data feeds (DMs, mentions, etc.) pub async fn subscribe_for_user_data(&self, public_key: PublicKey) { - let metadata = Filter::new() - .kinds(vec![ - Kind::Metadata, - Kind::ContactList, - Kind::InboxRelays, - Kind::MuteList, - Kind::SimpleGroups, - ]) - .author(public_key) - .limit(10); - - let data = Filter::new() - .author(public_key) - .kinds(vec![ - Kind::Metadata, - Kind::ContactList, - Kind::MuteList, - Kind::SimpleGroups, - Kind::InboxRelays, - Kind::RelayList, - ]) - .since(Timestamp::now()); - - let msg = Filter::new().kind(Kind::GiftWrap).pubkey(public_key); - let new_msg = Filter::new() - .kind(Kind::GiftWrap) - .pubkey(public_key) - .limit(0); - let all_messages_sub_id = SubscriptionId::new(ALL_MESSAGES_SUB_ID); let new_messages_sub_id = SubscriptionId::new(NEW_MESSAGE_SUB_ID); - let opts = shared_state().auto_close; - - self.client.subscribe(data, None).await.ok(); + let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); self.client - .subscribe(metadata, shared_state().auto_close) + .subscribe( + Filter::new() + .author(public_key) + .kinds(vec![ + Kind::Metadata, + Kind::ContactList, + Kind::MuteList, + Kind::SimpleGroups, + Kind::InboxRelays, + Kind::RelayList, + ]) + .since(Timestamp::now()), + None, + ) .await .ok(); self.client - .subscribe_with_id(all_messages_sub_id, msg, opts) + .subscribe( + Filter::new() + .kinds(vec![ + Kind::Metadata, + Kind::ContactList, + Kind::InboxRelays, + Kind::MuteList, + Kind::SimpleGroups, + ]) + .author(public_key) + .limit(10), + Some(SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE)), + ) .await .ok(); self.client - .subscribe_with_id(new_messages_sub_id, new_msg, None) + .subscribe_with_id( + all_messages_sub_id, + Filter::new().kind(Kind::GiftWrap).pubkey(public_key), + Some(opts), + ) .await .ok(); - log::info!("Subscribing to user's metadata..."); + self.client + .subscribe_with_id( + new_messages_sub_id, + Filter::new() + .kind(Kind::GiftWrap) + .pubkey(public_key) + .limit(0), + None, + ) + .await + .ok(); + + log::info!("Getting all user's metadata and messages..."); + // Process gift-wrapped events in the background + self.process_gift_wrap_events().detach(); } /// Subscribes to application update notifications pub(crate) async fn subscribe_for_app_updates(&self) { + let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); let coordinate = Coordinate { kind: Kind::Custom(32267), public_key: PublicKey::from_hex(APP_PUBKEY).expect("App Pubkey is invalid"), identifier: APP_ID.into(), }; - let filter = Filter::new() .kind(Kind::ReleaseArtifactSet) .coordinate(&coordinate) @@ -329,20 +461,20 @@ impl Globals { if let Err(e) = self .client - .subscribe_to(BOOTSTRAP_RELAYS, filter, shared_state().auto_close) + .subscribe_to(BOOTSTRAP_RELAYS, filter, Some(opts)) .await { log::error!("Failed to subscribe for app updates: {}", e); } - log::info!("Subscribing to app updates..."); + log::info!("Subscribed to app updates"); } pub(crate) async fn preload_metadata(&self) { let filter = Filter::new().kind(Kind::Metadata).limit(100); if let Ok(events) = self.client.database().query(filter).await { for event in events.into_iter() { - self.insert_person(&event).await; + self.insert_person_from_event(&event).await; } } } @@ -370,54 +502,71 @@ impl Globals { pub(crate) async fn get_unwrapped(&self, target: EventId) -> Result { let filter = Filter::new() .kind(Kind::ApplicationSpecificData) + .identifier(target) .event(target) .limit(1); if let Some(event) = self.client.database().query(filter).await?.first_owned() { Ok(Event::from_json(event.content)?) } else { - Err(anyhow!("Event not found")) + Err(anyhow!("Event is not cached yet")) } } - /// Unwraps a gift-wrapped event and processes its contents - pub(crate) async fn unwrap_event(&self, subscription_id: &SubscriptionId, event: &Event) { - let new_messages_id = SubscriptionId::new(NEW_MESSAGE_SUB_ID); - let random_keys = Keys::generate(); + /// Unwraps a gift-wrapped event and processes its contents. + /// + /// # Arguments + /// * `event` - The gift-wrapped event to unwrap + /// * `incoming` - Whether this is a newly received event (true) or old + /// + /// # Returns + /// Returns `true` if the event was successfully loaded from cache or saved after unwrapping. + pub(crate) async fn unwrap_event(&self, event: &Event, incoming: bool) -> bool { + let mut is_cached = false; let event = match self.get_unwrapped(event.id).await { - Ok(event) => event, - Err(_) => match self.client.unwrap_gift_wrap(event).await { - Ok(unwrap) => match unwrap.rumor.sign_with_keys(&random_keys) { - Ok(unwrapped) => { - self.set_unwrapped(event.id, &unwrapped, &random_keys) - .await - .ok(); + Ok(event) => { + is_cached = true; + event + } + Err(_) => { + match self.client.unwrap_gift_wrap(event).await { + Ok(unwrap) => { + let keys = Keys::generate(); + let Ok(unwrapped) = unwrap.rumor.sign_with_keys(&keys) else { + return false; + }; + + // Save this event to the database for future use. + if let Err(e) = self.set_unwrapped(event.id, &unwrapped, &keys).await { + log::error!("Failed to save event: {e}") + } + unwrapped } - Err(_) => return, - }, - Err(_) => return, - }, + Err(_) => return false, + } + } }; - let mut pubkeys = vec![]; - pubkeys.extend(event.tags.public_keys()); - pubkeys.push(event.pubkey); + // Save the event to the database, use for query directly. + if let Err(e) = self.client.database().save_event(&event).await { + log::error!("Failed to save event: {e}") + } // Send all pubkeys to the batch to sync metadata - self.batch_sender.send(pubkeys).await.ok(); + self.batch_sender.send(event.pubkey).await.ok(); - // Save the event to the database, use for query directly. - self.client.database().save_event(&event).await.ok(); - - // Send this event to the GPUI - if subscription_id == &new_messages_id { - self.global_sender - .send(NostrSignal::Event(event)) - .await - .ok(); + for public_key in event.tags.public_keys().copied() { + self.batch_sender.send(public_key).await.ok(); } + + // Send a notify to GPUI if this is a new message + if incoming { + self.send_signal(NostrSignal::Event(event)).await; + } + + is_cached } /// Extracts public keys from contact list and queues metadata sync @@ -425,8 +574,9 @@ impl Globals { if let Ok(signer) = self.client.signer().await { if let Ok(public_key) = signer.get_public_key().await { if public_key == event.pubkey { - let pubkeys = event.tags.public_keys().copied().collect::>(); - self.batch_sender.send(pubkeys).await.ok(); + for public_key in event.tags.public_keys().copied() { + self.batch_sender.send(public_key).await.ok(); + } } } } @@ -434,6 +584,7 @@ impl Globals { /// Fetches metadata for a batch of public keys pub(crate) async fn sync_data_for_pubkeys(&self, public_keys: BTreeSet) { + let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); let kinds = vec![ Kind::Metadata, Kind::ContactList, @@ -447,46 +598,29 @@ impl Globals { if let Err(e) = shared_state() .client - .subscribe_to(BOOTSTRAP_RELAYS, filter, shared_state().auto_close) + .subscribe_to(BOOTSTRAP_RELAYS, filter, Some(opts)) .await { log::error!("Failed to sync metadata: {e}"); } } - /// Inserts or updates a person's metadata from a Kind::Metadata event - pub(crate) async fn insert_person(&self, event: &Event) { - let metadata = Metadata::from_json(&event.content).ok(); - - self.persons - .write() - .await - .entry(event.pubkey) - .and_modify(|entry| { - if entry.is_none() { - *entry = metadata.clone(); - } - }) - .or_insert_with(|| metadata); - } - /// Notifies UI of application updates via global channel pub(crate) async fn notify_update(&self, event: &Event) { + let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); let filter = Filter::new() .ids(event.tags.event_ids().copied()) .kind(Kind::FileMetadata); if let Err(e) = self .client - .subscribe_to(BOOTSTRAP_RELAYS, filter, self.auto_close) + .subscribe_to(BOOTSTRAP_RELAYS, filter, Some(opts)) .await { log::error!("Failed to subscribe for file metadata: {}", e); } else { - self.global_sender - .send(NostrSignal::AppUpdate(event.to_owned())) - .await - .ok(); + self.send_signal(NostrSignal::AppUpdate(event.to_owned())) + .await; } } } diff --git a/crates/identity/src/lib.rs b/crates/identity/src/lib.rs index db6742d..81ac8f5 100644 --- a/crates/identity/src/lib.rs +++ b/crates/identity/src/lib.rs @@ -5,7 +5,7 @@ use client_keys::ClientKeys; use common::handle_auth::CoopAuthUrlHandler; use global::{ constants::{ACCOUNT_D, NIP17_RELAYS, NIP65_RELAYS, NOSTR_CONNECT_TIMEOUT}, - shared_state, NostrSignal, + shared_state, }; use gpui::{ div, prelude::FluentBuilder, red, App, AppContext, Context, Entity, Global, ParentElement, @@ -77,18 +77,14 @@ impl Identity { pub fn load(&mut self, window: &mut Window, cx: &mut Context) { let task = cx.background_spawn(async move { + let database = shared_state().client().database(); + let filter = Filter::new() .kind(Kind::ApplicationSpecificData) .identifier(ACCOUNT_D) .limit(1); - if let Some(event) = shared_state() - .client - .database() - .query(filter) - .await? - .first_owned() - { + if let Some(event) = database.query(filter).await?.first_owned() { let secret = event.content; let is_bunker = secret.starts_with("bunker://"); @@ -119,21 +115,17 @@ impl Identity { pub fn unload(&mut self, window: &mut Window, cx: &mut Context) { let task = cx.background_spawn(async move { + let client = shared_state().client(); let filter = Filter::new() .kind(Kind::ApplicationSpecificData) .identifier(ACCOUNT_D) .limit(1); // Unset signer - shared_state().client.unset_signer().await; + client.unset_signer().await; // Delete account - shared_state() - .client - .database() - .delete(filter) - .await - .is_ok() + client.database().delete(filter).await.is_ok() }); cx.spawn_in(window, async move |this, cx| { @@ -330,33 +322,23 @@ impl Identity { S: NostrSigner + 'static, { let task: Task> = cx.background_spawn(async move { + let client = shared_state().client(); let public_key = signer.get_public_key().await?; // Update signer - shared_state().client.set_signer(signer).await; + client.set_signer(signer).await; + + // Subscribe for user's data + shared_state().subscribe_for_user_data(public_key).await; // Fetch user's metadata - let metadata = shared_state() - .client - .fetch_metadata(public_key, Duration::from_secs(2)) + let metadata = client + .fetch_metadata(public_key, Duration::from_secs(3)) .await? .unwrap_or_default(); // Create user's profile with public key and metadata - let profile = Profile::new(public_key, metadata); - - // Subscribe for user's data - nostr_sdk::async_utility::task::spawn(async move { - shared_state().subscribe_for_user_data(public_key).await; - }); - - // Notify GPUi via the global channel - shared_state() - .global_sender - .send(NostrSignal::SignerUpdated) - .await?; - - Ok(profile) + Ok(Profile::new(public_key, metadata)) }); cx.spawn_in(window, async move |this, cx| match task.await { @@ -389,10 +371,12 @@ impl Identity { self.write_keys(&keys, password, cx); cx.background_spawn(async move { + let client = shared_state().client(); + // Update signer - shared_state().client.set_signer(keys).await; + client.set_signer(keys).await; // Set metadata - shared_state().client.set_metadata(&metadata).await.ok(); + client.set_metadata(&metadata).await.ok(); // Create relay list let builder = EventBuilder::new(Kind::RelayList, "").tags( @@ -405,7 +389,7 @@ impl Identity { }), ); - if let Err(e) = shared_state().client.send_event_builder(builder).await { + if let Err(e) = client.send_event_builder(builder).await { log::error!("Failed to send relay list event: {}", e); }; @@ -420,18 +404,11 @@ impl Identity { }), ); - if let Err(e) = shared_state().client.send_event_builder(builder).await { + if let Err(e) = client.send_event_builder(builder).await { log::error!("Failed to send messaging relay list event: {}", e); }; - // Notify GPUi via the global channel - shared_state() - .global_sender - .send(NostrSignal::SignerUpdated) - .await - .ok(); - - // Subscribe + // Subscribe for user's data shared_state() .subscribe_for_user_data(profile.public_key()) .await; @@ -453,14 +430,16 @@ impl Identity { } cx.background_spawn(async move { + let client = shared_state().client(); let keys = Keys::generate(); + let builder = EventBuilder::new(Kind::ApplicationSpecificData, value).tags(vec![ Tag::identifier(ACCOUNT_D), Tag::public_key(public_key), ]); if let Ok(event) = builder.sign(&keys).await { - if let Err(e) = shared_state().client.database().save_event(&event).await { + if let Err(e) = client.database().save_event(&event).await { log::error!("Failed to save event: {e}"); }; } @@ -476,7 +455,9 @@ impl Identity { if let Ok(enc_key) = EncryptedSecretKey::new(keys.secret_key(), &password, 16, KeySecurity::Medium) { + let client = shared_state().client(); let keys = Keys::generate(); + let builder = EventBuilder::new(Kind::ApplicationSpecificData, enc_key.to_bech32().unwrap()) .tags(vec![ @@ -485,7 +466,7 @@ impl Identity { ]); if let Ok(event) = builder.sign(&keys).await { - if let Err(e) = shared_state().client.database().save_event(&event).await { + if let Err(e) = client.database().save_event(&event).await { log::error!("Failed to save event: {e}"); }; } diff --git a/crates/settings/src/lib.rs b/crates/settings/src/lib.rs index a79355d..f72f2f2 100644 --- a/crates/settings/src/lib.rs +++ b/crates/settings/src/lib.rs @@ -85,18 +85,14 @@ impl AppSettings { pub(crate) fn get_settings_from_db(&self, cx: &mut Context) { let task: Task> = cx.background_spawn(async move { + let database = shared_state().client().database(); + let filter = Filter::new() .kind(Kind::ApplicationSpecificData) .identifier(SETTINGS_D) .limit(1); - if let Some(event) = shared_state() - .client - .database() - .query(filter) - .await? - .first_owned() - { + if let Some(event) = database.query(filter).await?.first_owned() { log::info!("Successfully loaded settings from database"); Ok(serde_json::from_str(&event.content)?) } else { @@ -120,13 +116,14 @@ impl AppSettings { if let Ok(content) = serde_json::to_string(&self.settings) { cx.background_spawn(async move { let keys = Keys::generate(); + let database = shared_state().client().database(); if let Ok(event) = EventBuilder::new(Kind::ApplicationSpecificData, content) .tags(vec![Tag::identifier(SETTINGS_D)]) .sign(&keys) .await { - if let Err(e) = shared_state().client.database().save_event(&event).await { + if let Err(e) = database.save_event(&event).await { log::error!("Failed to save user settings: {e}"); } else { log::info!("New settings have been saved successfully");