diff --git a/crates/coop/src/main.rs b/crates/coop/src/main.rs index db4f140..43c97ca 100644 --- a/crates/coop/src/main.rs +++ b/crates/coop/src/main.rs @@ -6,10 +6,10 @@ use anyhow::{anyhow, Error}; use assets::Assets; use common::event::EventUtils; use global::constants::{ - ALL_MESSAGES_ID, APP_ID, APP_NAME, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT, - METADATA_BATCH_TIMEOUT, NEW_MESSAGE_ID, SEARCH_RELAYS, + APP_ID, APP_NAME, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT, METADATA_BATCH_TIMEOUT, + SEARCH_RELAYS, WAIT_FOR_FINISH, }; -use global::{nostr_client, set_all_gift_wraps_fetched, NostrSignal}; +use global::{gift_wrap_sub_id, nostr_client, starting_time, NostrSignal}; use gpui::{ actions, point, px, size, App, AppContext, Application, Bounds, KeyBinding, Menu, MenuItem, SharedString, TitlebarOptions, WindowBackgroundAppearance, WindowBounds, WindowDecorations, @@ -39,6 +39,9 @@ fn main() { // Initialize the Nostr Client let client = nostr_client(); + // Initialize the starting time + let _ = starting_time(); + // Initialize the Application let app = Application::new() .with_assets(Assets) @@ -46,7 +49,7 @@ fn main() { let (signal_tx, signal_rx) = channel::bounded::(2048); let (mta_tx, mta_rx) = channel::bounded::(1024); - let (event_tx, event_rx) = channel::unbounded::(); + let (event_tx, event_rx) = channel::bounded::(2048); let signal_tx_clone = signal_tx.clone(); let mta_tx_clone = mta_tx.clone(); @@ -131,7 +134,7 @@ fn main() { continue; } - let duration = smol::Timer::after(Duration::from_secs(30)); + let duration = smol::Timer::after(Duration::from_secs(WAIT_FOR_FINISH)); let recv = || async { // no inline @@ -145,8 +148,7 @@ fn main() { match smol::future::or(recv(), timeout()).await { Some(event) => { - // Process the gift wrap event unwrapping - let cached = try_unwrap_event(&signal_tx, &mta_tx, &event, false).await; + let cached = try_unwrap_event(&event, &signal_tx, &mta_tx).await; // Increment the total messages counter if message is not from cache if !cached { @@ -163,17 +165,9 @@ fn main() { None => { // Notify the UI that the processing is finished signal_tx.send(NostrSignal::Finish).await.ok(); - // Mark all gift wraps as fetched - // For the next time Coop only needs to process new gift wraps - set_all_gift_wraps_fetched().await; - - break; } } } - - // Event channel is no longer needed when all gift wrap events have been processed - event_rx.close(); }) .detach(); @@ -246,8 +240,6 @@ fn main() { // Spawn a task to handle events from nostr channel cx.spawn_in(window, async move |_, cx| { - let all_messages = SubscriptionId::new(ALL_MESSAGES_ID); - while let Ok(signal) = signal_rx.recv().await { cx.update(|window, cx| { let registry = Registry::global(cx); @@ -277,8 +269,8 @@ fn main() { } // Load chat rooms without setting as finished NostrSignal::Eose(subscription_id) => { - // Only load chat rooms if the subscription ID matches the all_messages_sub_id - if subscription_id == all_messages { + // Only load chat rooms if the subscription matches the gift wrap subscription + if gift_wrap_sub_id() == &subscription_id { registry.update(cx, |this, cx| { this.load_rooms(window, cx); }); @@ -369,9 +361,7 @@ async fn handle_nostr_notifications( mta_tx: &Sender, event_tx: &Sender, ) -> Result<(), Error> { - let new_messages = SubscriptionId::new(NEW_MESSAGE_ID); let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); - let mut notifications = client.notifications(); let mut processed_events: BTreeSet = BTreeSet::new(); let mut processed_dm_relays: BTreeSet = BTreeSet::new(); @@ -382,10 +372,7 @@ async fn handle_nostr_notifications( }; match message { - RelayMessage::Event { - event, - subscription_id, - } => { + RelayMessage::Event { event, .. } => { if processed_events.contains(&event.id) { continue; } @@ -394,14 +381,7 @@ async fn handle_nostr_notifications( match event.kind { Kind::GiftWrap => { - // Process to unwrap directly if event come from new messages subscription - // Otherwise, send the event to the event_tx channel - if *subscription_id == new_messages { - log::info!("receive a new message: {:?}", event.id); - try_unwrap_event(signal_tx, mta_tx, &event, true).await; - } else { - event_tx.send(event.into_owned()).await.ok(); - } + event_tx.send(event.into_owned()).await.ok(); } Kind::Metadata => { signal_tx @@ -428,13 +408,7 @@ async fn handle_nostr_notifications( .kind(Kind::InboxRelays) .limit(1); - if let Ok(output) = client.subscribe(filter, Some(opts)).await { - log::info!( - "Subscribed to get DM relays: {} - Relays: {:?}", - event.pubkey.to_bech32().unwrap(), - output.success - ) - } + client.subscribe(filter, Some(opts)).await.ok(); } _ => {} } @@ -517,10 +491,9 @@ async fn get_unwrapped(root: EventId) -> Result { /// Unwraps a gift-wrapped event and processes its contents. async fn try_unwrap_event( + event: &Event, signal_tx: &Sender, mta_tx: &Sender, - event: &Event, - incoming: bool, ) -> bool { let client = nostr_client(); let mut is_cached = false; @@ -554,16 +527,13 @@ async fn try_unwrap_event( } }; - // Get all pubkeys from the event - let all_pubkeys = event.all_pubkeys(); - // Send all pubkeys to the metadata batch to sync data - for public_key in all_pubkeys { + for public_key in event.all_pubkeys() { mta_tx.send(public_key).await.ok(); } // Send a notify to GPUI if this is a new message - if incoming { + if starting_time() <= &event.created_at { signal_tx.send(NostrSignal::GiftWrap(event)).await.ok(); } diff --git a/crates/coop/src/views/chat/mod.rs b/crates/coop/src/views/chat/mod.rs index 8aa1351..591de1f 100644 --- a/crates/coop/src/views/chat/mod.rs +++ b/crates/coop/src/views/chat/mod.rs @@ -42,6 +42,9 @@ use ui::{ mod subject; +const DUPLICATE_TIME_WINDOW: u64 = 10; +const MAX_RECENT_MESSAGES_TO_CHECK: usize = 5; + #[derive(Action, Clone, PartialEq, Eq, Deserialize)] #[action(namespace = chat, no_json)] pub struct ChangeSubject(pub String); @@ -131,7 +134,7 @@ impl Chat { RoomSignal::Refresh => { this.load_messages(window, cx); } - } + }; }, )); @@ -236,11 +239,16 @@ impl Chat { return false; } - let min_timestamp = new_msg.created_at.as_u64().saturating_sub(10); + let messages = self.messages.read(cx); + let min_timestamp = new_msg + .created_at + .as_u64() + .saturating_sub(DUPLICATE_TIME_WINDOW); - self.messages - .read(cx) + messages .iter() + .rev() + .take(MAX_RECENT_MESSAGES_TO_CHECK) .filter(|m| m.author == identity) .any(|existing| { // Check if messages are within the time window diff --git a/crates/coop/src/views/messaging_relays.rs b/crates/coop/src/views/messaging_relays.rs index 20c2ec5..ecfc576 100644 --- a/crates/coop/src/views/messaging_relays.rs +++ b/crates/coop/src/views/messaging_relays.rs @@ -1,7 +1,7 @@ use std::time::Duration; use anyhow::{anyhow, Error}; -use global::constants::{NEW_MESSAGE_ID, NIP17_RELAYS}; +use global::constants::{GIFT_WRAP_SUB_ID, NIP17_RELAYS}; use global::nostr_client; use gpui::prelude::FluentBuilder; use gpui::{ @@ -208,11 +208,8 @@ impl MessagingRelays { _ = client.connect_relay(&relay).await; } - let id = SubscriptionId::new(NEW_MESSAGE_ID); - let new_messages = Filter::new() - .kind(Kind::GiftWrap) - .pubkey(public_key) - .limit(0); + let id = SubscriptionId::new(GIFT_WRAP_SUB_ID); + let new_messages = Filter::new().kind(Kind::GiftWrap).pubkey(public_key); // Close old subscriptions client.unsubscribe(&id).await; diff --git a/crates/global/src/constants.rs b/crates/global/src/constants.rs index 8955ab0..913c02a 100644 --- a/crates/global/src/constants.rs +++ b/crates/global/src/constants.rs @@ -36,20 +36,21 @@ pub const NOSTR_CONNECT_RELAY: &str = "wss://relay.nsec.app"; /// Default timeout (in seconds) for Nostr Connect pub const NOSTR_CONNECT_TIMEOUT: u64 = 200; -/// Unique ID for new message subscription. -pub const NEW_MESSAGE_ID: &str = "listen_new_giftwraps"; -/// Unique ID for all messages subscription. -pub const ALL_MESSAGES_ID: &str = "listen_all_giftwraps"; -/// Unique ID for all newest messages subscription. -pub const ALL_NEWEST_MESSAGES_ID: &str = "listen_all_newest_giftwraps"; +/// Unique ID for all gift wraps subscription. +pub const GIFT_WRAP_SUB_ID: &str = "listen_for_giftwraps"; /// Total metadata requests will be grouped. pub const METADATA_BATCH_LIMIT: usize = 100; + /// Maximum timeout for grouping metadata requests. pub const METADATA_BATCH_TIMEOUT: u64 = 400; +/// Maximum timeout for waiting for finish (seconds) +pub const WAIT_FOR_FINISH: u64 = 60; + /// Default width for all modals. pub const DEFAULT_MODAL_WIDTH: f32 = 420.; + /// Default width of the sidebar. pub const DEFAULT_SIDEBAR_WIDTH: f32 = 280.; diff --git a/crates/global/src/lib.rs b/crates/global/src/lib.rs index 9b471b1..ae32124 100644 --- a/crates/global/src/lib.rs +++ b/crates/global/src/lib.rs @@ -5,6 +5,7 @@ use nostr_connect::prelude::*; use nostr_sdk::prelude::*; use paths::nostr_file; +use crate::constants::GIFT_WRAP_SUB_ID; use crate::paths::support_dir; pub mod constants; @@ -33,6 +34,8 @@ pub enum NostrSignal { } static NOSTR_CLIENT: OnceLock = OnceLock::new(); +static GIFT_WRAP_ID: OnceLock = OnceLock::new(); +static CURRENT_TIMESTAMP: OnceLock = OnceLock::new(); static FIRST_RUN: OnceLock = OnceLock::new(); pub fn nostr_client() -> &'static Client { @@ -61,6 +64,14 @@ pub fn nostr_client() -> &'static Client { }) } +pub fn gift_wrap_sub_id() -> &'static SubscriptionId { + GIFT_WRAP_ID.get_or_init(|| SubscriptionId::new(GIFT_WRAP_SUB_ID)) +} + +pub fn starting_time() -> &'static Timestamp { + CURRENT_TIMESTAMP.get_or_init(Timestamp::now) +} + pub fn first_run() -> &'static bool { FIRST_RUN.get_or_init(|| { let flag = support_dir().join(format!(".{}-first_run", env!("CARGO_PKG_VERSION"))); @@ -75,16 +86,3 @@ pub fn first_run() -> &'static bool { } }) } - -pub async fn set_all_gift_wraps_fetched() { - let flag = support_dir().join(".fetched"); - - if !flag.exists() && smol::fs::write(&flag, "").await.is_err() { - log::error!("Failed to create full run flag"); - } -} - -pub fn is_gift_wraps_fetch_complete() -> bool { - let flag = support_dir().join(".fetched"); - flag.exists() -} diff --git a/crates/identity/src/lib.rs b/crates/identity/src/lib.rs index 6c872f1..57f3f2e 100644 --- a/crates/identity/src/lib.rs +++ b/crates/identity/src/lib.rs @@ -3,11 +3,8 @@ use std::time::Duration; use anyhow::{anyhow, Error}; use client_keys::ClientKeys; use common::handle_auth::CoopAuthUrlHandler; -use global::constants::{ - ACCOUNT_D, ALL_MESSAGES_ID, ALL_NEWEST_MESSAGES_ID, NEW_MESSAGE_ID, NIP17_RELAYS, NIP65_RELAYS, - NOSTR_CONNECT_TIMEOUT, -}; -use global::{is_gift_wraps_fetch_complete, nostr_client}; +use global::constants::{ACCOUNT_D, NIP17_RELAYS, NIP65_RELAYS, NOSTR_CONNECT_TIMEOUT}; +use global::{gift_wrap_sub_id, nostr_client}; use gpui::prelude::FluentBuilder; use gpui::{ div, red, App, AppContext, Context, Entity, Global, ParentElement, SharedString, Styled, @@ -632,13 +629,15 @@ impl Identity { } pub(crate) async fn subscribe(client: &Client, public_key: PublicKey) -> Result<(), Error> { - let all_messages = SubscriptionId::new(ALL_MESSAGES_ID); - let all_newest_messages = SubscriptionId::new(ALL_NEWEST_MESSAGES_ID); - let new_messages = SubscriptionId::new(NEW_MESSAGE_ID); - // Subscription options let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); - // Get the gift wraps fetch status - let is_completed = is_gift_wraps_fetch_complete(); + + client + .subscribe_with_id( + gift_wrap_sub_id().to_owned(), + Filter::new().kind(Kind::GiftWrap).pubkey(public_key), + None, + ) + .await?; client .subscribe( @@ -660,37 +659,6 @@ impl Identity { ) .await?; - client - .subscribe_with_id( - new_messages, - Filter::new() - .kind(Kind::GiftWrap) - .pubkey(public_key) - .limit(0), - None, - ) - .await?; - - if is_completed { - let week_ago: u64 = 7 * 24 * 60 * 60; - let since = Timestamp::from_secs(Timestamp::now().as_u64() - week_ago); - - let filter = Filter::new() - .pubkey(public_key) - .kind(Kind::GiftWrap) - .since(since); - - client - .subscribe_with_id(all_newest_messages, filter, Some(opts)) - .await?; - } else { - let filter = Filter::new().kind(Kind::GiftWrap).pubkey(public_key); - - client - .subscribe_with_id(all_messages, filter, Some(opts)) - .await?; - }; - log::info!("Getting all user's metadata and messages..."); Ok(())