From d392602ed6c2cbd34b564fe312813f94d1c0a292 Mon Sep 17 00:00:00 2001 From: reya <123083837+reyamir@users.noreply.github.com> Date: Tue, 2 Sep 2025 18:19:53 +0700 Subject: [PATCH] feat: resend messages after authentication (#137) * resend failed message * update settings --- crates/coop/src/chatspace.rs | 92 ++++++++++++++++++++++--------- crates/coop/src/main.rs | 9 +-- crates/coop/src/views/chat/mod.rs | 5 +- crates/global/src/constants.rs | 5 +- crates/global/src/lib.rs | 34 ++++++------ crates/registry/src/room.rs | 31 ++++++++++- crates/settings/src/lib.rs | 88 +++++++++++++++-------------- 7 files changed, 166 insertions(+), 98 deletions(-) diff --git a/crates/coop/src/chatspace.rs b/crates/coop/src/chatspace.rs index 179e373..4d6af91 100644 --- a/crates/coop/src/chatspace.rs +++ b/crates/coop/src/chatspace.rs @@ -10,9 +10,9 @@ use common::display::ReadableProfile; use common::event::EventUtils; use global::constants::{ ACCOUNT_IDENTIFIER, BOOTSTRAP_RELAYS, DEFAULT_SIDEBAR_WIDTH, METADATA_BATCH_LIMIT, - METADATA_BATCH_TIMEOUT, SEARCH_RELAYS, TOTAL_RETRY, WAIT_FOR_FINISH, + METADATA_BATCH_TIMEOUT, RELAY_RETRY, SEARCH_RELAYS, WAIT_FOR_FINISH, }; -use global::{ingester, nostr_client, sent_ids, starting_time, AuthReq, IngesterSignal, Notice}; +use global::{css, ingester, nostr_client, AuthRequest, IngesterSignal, Notice}; use gpui::prelude::FluentBuilder; use gpui::{ div, px, rems, App, AppContext, AsyncWindowContext, Axis, Context, Entity, InteractiveElement, @@ -218,14 +218,14 @@ impl ChatSpace { break; } else { retry += 1; - if retry == TOTAL_RETRY { + if retry == RELAY_RETRY { ingester.send(IngesterSignal::DmRelayNotFound).await; break; } } } else { nip65_retry += 1; - if nip65_retry == TOTAL_RETRY { + if nip65_retry == RELAY_RETRY { ingester.send(IngesterSignal::DmRelayNotFound).await; break; } @@ -358,12 +358,12 @@ impl ChatSpace { ) -> Result<(), Error> { let client = nostr_client(); let ingester = ingester(); - let sent_ids = sent_ids(); + let css = css(); let auto_close = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE); let mut processed_events: HashSet = HashSet::new(); - let mut auth_requests: HashMap> = HashMap::new(); + let mut challenges: HashSet> = HashSet::new(); let mut notifications = client.notifications(); while let Ok(notification) = notifications.recv().await { @@ -457,21 +457,28 @@ impl ChatSpace { } } RelayMessage::Auth { challenge } => { - // Prevent duplicate auth requests - if auth_requests - .insert(relay_url.clone(), challenge.clone()) - .is_none() - { - let auth_req = AuthReq::new(challenge, relay_url); - ingester.send(IngesterSignal::Auth(auth_req)).await; + if challenges.insert(challenge.clone()) { + let req = AuthRequest::new(challenge, relay_url); + // Send a signal to the ingester to handle the auth request + ingester.send(IngesterSignal::Auth(req)).await; } } - RelayMessage::Ok { event_id, .. } => { + RelayMessage::Ok { + event_id, message, .. + } => { // Keep track of events sent by Coop - sent_ids.write().await.push(event_id); - } - RelayMessage::Notice(msg) => { - log::info!("Notice: {msg} - {relay_url}"); + css.sent_ids.write().await.insert(event_id); + + // Keep track of events that need to be resent + match MachineReadablePrefix::parse(&message) { + Some(MachineReadablePrefix::AuthRequired) => { + if client.has_signer().await { + css.resend_queue.write().await.insert(event_id, relay_url); + }; + } + Some(_) => {} + None => {} + } } _ => {} } @@ -488,6 +495,7 @@ impl ChatSpace { while let Ok(signal) = signals.recv().await { cx.update(|window, cx| { let registry = Registry::global(cx); + let settings = AppSettings::global(cx); match signal { IngesterSignal::SignerSet(public_key) => { @@ -499,6 +507,11 @@ impl ChatSpace { }) .ok(); + // Load user's settings + settings.update(cx, |this, cx| { + this.load_settings(cx); + }); + // Load all chat rooms registry.update(cx, |this, cx| { this.set_identity(public_key, cx); @@ -520,13 +533,14 @@ impl ChatSpace { IngesterSignal::Auth(req) => { let relay_url = &req.url; let challenge = &req.challenge; - let auth_auth = AppSettings::get_auto_auth(cx); - let auth_relays = AppSettings::read_global(cx).auth_relays(); + let auto_auth = AppSettings::get_auto_auth(cx); + let is_authenticated_relays = + AppSettings::read_global(cx).is_authenticated_relays(relay_url); view.update(cx, |this, cx| { this.push_auth_request(challenge, relay_url, cx); - if auth_auth && auth_relays.contains(relay_url) { + if auto_auth && is_authenticated_relays { // Automatically authenticate if the relay is authenticated before this.auth(challenge, relay_url, window, cx); } else { @@ -708,6 +722,7 @@ impl ChatSpace { async fn unwrap_gift_wrap_event(gift: &Event, pubkey_tx: &Sender) -> bool { let client = nostr_client(); let ingester = ingester(); + let css = css(); let mut is_cached = false; let event = match Self::get_unwrapped_event(gift.id).await { @@ -745,7 +760,7 @@ impl ChatSpace { } // Send a notify to GPUI if this is a new message - if &event.created_at >= starting_time() { + if event.created_at >= css.init_at { ingester .send(IngesterSignal::GiftWrap((gift.id, event))) .await; @@ -800,6 +815,7 @@ impl ChatSpace { let task: Task> = cx.background_spawn(async move { let client = nostr_client(); + let css = css(); let signer = client.signer().await?; // Construct event @@ -827,16 +843,38 @@ impl ChatSpace { } => { if id == event_id { // Re-subscribe to previous subscription - match relay.resubscribe().await { - Ok(_) => { - log::info!("{relay_url} - re-subscribe"); + relay.resubscribe().await?; + + // Get all failed events that need to be resent + let mut queue = css.resend_queue.write().await; + + let ids: Vec = queue + .iter() + .filter(|(_, url)| relay_url == *url) + .map(|(id, _)| *id) + .collect(); + + for id in ids.into_iter() { + if let Some(relay_url) = queue.remove(&id) { + if let Some(event) = client.database().event_by_id(&id).await? { + let event_id = relay.send_event(&event).await?; + + let output = Output { + val: event_id, + failed: HashMap::new(), + success: HashSet::from([relay_url]), + }; + + css.sent_ids.write().await.insert(event_id); + css.resent_ids.write().await.push(output); + } } - Err(e) => return Err(e.into()), } return Ok(()); } } + RelayNotification::AuthenticationFailed => break, RelayNotification::Shutdown => break, _ => {} } @@ -854,7 +892,7 @@ impl ChatSpace { // Save the authenticated relay to automatically authenticate future requests settings.update(cx, |this, cx| { - this.push_auth_relay(url.clone(), cx); + this.push_relay(&url, cx); }); // Clear the current notification diff --git a/crates/coop/src/main.rs b/crates/coop/src/main.rs index a27c845..c04b93c 100644 --- a/crates/coop/src/main.rs +++ b/crates/coop/src/main.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use assets::Assets; use global::constants::{APP_ID, APP_NAME}; -use global::{ingester, nostr_client, sent_ids, starting_time}; +use global::{css, ingester, nostr_client}; use gpui::{ point, px, size, AppContext, Application, Bounds, KeyBinding, Menu, MenuItem, SharedString, TitlebarOptions, WindowBackgroundAppearance, WindowBounds, WindowDecorations, WindowKind, @@ -29,11 +29,8 @@ fn main() { // Initialize the ingester let _ingester = ingester(); - // Initialize the starting time - let _starting_time = starting_time(); - - // Initialize the sent IDs storage - let _sent_ids = sent_ids(); + // Initialize the coop simple storage + let _css = css(); // Initialize the Application let app = Application::new() diff --git a/crates/coop/src/views/chat/mod.rs b/crates/coop/src/views/chat/mod.rs index fc66711..d3bdf20 100644 --- a/crates/coop/src/views/chat/mod.rs +++ b/crates/coop/src/views/chat/mod.rs @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet}; use anyhow::anyhow; use common::display::{ReadableProfile, ReadableTimestamp}; use common::nip96::nip96_upload; -use global::{nostr_client, sent_ids}; +use global::{css, nostr_client}; use gpui::prelude::FluentBuilder; use gpui::{ div, img, list, px, red, relative, rems, svg, white, Action, AnyElement, App, AppContext, @@ -220,8 +220,7 @@ impl Chat { /// Check if the event is sent by Coop fn is_sent_by_coop(&self, gift_wrap_id: &EventId) -> bool { - let sent_ids = sent_ids(); - sent_ids.read_blocking().contains(gift_wrap_id) + css().sent_ids.read_blocking().contains(gift_wrap_id) } /// Set the sending state of the chat panel diff --git a/crates/global/src/constants.rs b/crates/global/src/constants.rs index 5755010..cae6f46 100644 --- a/crates/global/src/constants.rs +++ b/crates/global/src/constants.rs @@ -34,7 +34,10 @@ pub const NIP17_RELAYS: [&str; 2] = ["wss://nip17.com", "wss://auth.nostr1.com"] pub const NOSTR_CONNECT_RELAY: &str = "wss://relay.nsec.app"; /// Default retry count for fetching NIP-17 relays -pub const TOTAL_RETRY: u64 = 2; +pub const RELAY_RETRY: u64 = 2; + +/// Default retry count for sending messages +pub const SEND_RETRY: u64 = 5; /// Default timeout (in seconds) for Nostr Connect pub const NOSTR_CONNECT_TIMEOUT: u64 = 200; diff --git a/crates/global/src/lib.rs b/crates/global/src/lib.rs index 71bb9fd..0576a30 100644 --- a/crates/global/src/lib.rs +++ b/crates/global/src/lib.rs @@ -1,3 +1,4 @@ +use std::collections::{HashMap, HashSet}; use std::sync::OnceLock; use std::time::Duration; @@ -12,13 +13,13 @@ use crate::paths::support_dir; pub mod constants; pub mod paths; -#[derive(Debug, Clone)] -pub struct AuthReq { +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct AuthRequest { pub challenge: String, pub url: RelayUrl, } -impl AuthReq { +impl AuthRequest { pub fn new(challenge: impl Into, url: RelayUrl) -> Self { Self { challenge: challenge.into(), @@ -54,7 +55,7 @@ pub enum IngesterSignal { SignerUnset, /// A signal to notify UI that the relay requires authentication - Auth(AuthReq), + Auth(AuthRequest), /// A signal to notify UI that the browser proxy service is down ProxyDown, @@ -107,14 +108,18 @@ impl Ingester { } } +/// A simple storage to store all runtime states that using across the application. +#[derive(Debug, Default)] +pub struct CoopSimpleStorage { + pub init_at: Timestamp, + pub sent_ids: RwLock>, + pub resent_ids: RwLock>>, + pub resend_queue: RwLock>, +} + static NOSTR_CLIENT: OnceLock = OnceLock::new(); - static INGESTER: OnceLock = OnceLock::new(); - -static SENT_IDS: OnceLock>> = OnceLock::new(); - -static CURRENT_TIMESTAMP: OnceLock = OnceLock::new(); - +static COOP_SIMPLE_STORAGE: OnceLock = OnceLock::new(); static FIRST_RUN: OnceLock = OnceLock::new(); pub fn nostr_client() -> &'static Client { @@ -132,7 +137,6 @@ pub fn nostr_client() -> &'static Client { .gossip(true) .automatic_authentication(false) .verify_subscriptions(false) - // Sleep after idle for 30 seconds .sleep_when_idle(SleepWhenIdle::Enabled { timeout: Duration::from_secs(30), }); @@ -145,12 +149,8 @@ pub fn ingester() -> &'static Ingester { INGESTER.get_or_init(Ingester::new) } -pub fn starting_time() -> &'static Timestamp { - CURRENT_TIMESTAMP.get_or_init(Timestamp::now) -} - -pub fn sent_ids() -> &'static RwLock> { - SENT_IDS.get_or_init(|| RwLock::new(Vec::new())) +pub fn css() -> &'static CoopSimpleStorage { + COOP_SIMPLE_STORAGE.get_or_init(CoopSimpleStorage::default) } pub fn first_run() -> &'static bool { diff --git a/crates/registry/src/room.rs b/crates/registry/src/room.rs index 1ba1064..54d2c10 100644 --- a/crates/registry/src/room.rs +++ b/crates/registry/src/room.rs @@ -1,10 +1,12 @@ use std::cmp::Ordering; use std::hash::{Hash, Hasher}; +use std::time::Duration; use anyhow::Error; use common::display::ReadableProfile; use common::event::EventUtils; -use global::nostr_client; +use global::constants::SEND_RETRY; +use global::{css, nostr_client}; use gpui::{App, AppContext, Context, EventEmitter, SharedString, Task}; use itertools::Itertools; use nostr_sdk::prelude::*; @@ -472,7 +474,32 @@ impl Room { .await { Ok(output) => { - reports.push(SendReport::output(receiver, output)); + if output + .failed + .iter() + .any(|(_, msg)| msg.starts_with("auth-required:")) + { + let id = output.id(); + + // Wait for authenticated and resent event successfully + for attempt in 0..=SEND_RETRY { + // Check if event was successfully resent + if let Some(output) = + css().resent_ids.read().await.iter().find(|o| o.id() == id) + { + reports.push(SendReport::output(receiver, output.to_owned())); + break; + } + + if attempt == SEND_RETRY { + break; + } + + smol::Timer::after(Duration::from_secs(1)).await; + } + } else { + reports.push(SendReport::output(receiver, output)); + } } Err(e) => { if let nostr_sdk::client::Error::PrivateMsgRelaysNotFound = e { diff --git a/crates/settings/src/lib.rs b/crates/settings/src/lib.rs index 7379ed6..4dc0161 100644 --- a/crates/settings/src/lib.rs +++ b/crates/settings/src/lib.rs @@ -94,7 +94,6 @@ impl Global for GlobalAppSettings {} pub struct AppSettings { setting_values: Settings, _subscriptions: SmallVec<[Subscription; 1]>, - _tasks: SmallVec<[Task<()>; 1]>, } impl AppSettings { @@ -113,15 +112,23 @@ impl AppSettings { cx.set_global(GlobalAppSettings(state)); } - fn new(cx: &mut Context) -> Self { - let setting_values = Settings::default(); - let mut tasks = smallvec![]; + fn new(_cx: &mut Context) -> Self { + Self { + setting_values: Settings::default(), + _subscriptions: smallvec![], + } + } + pub fn load_settings(&self, cx: &mut Context) { let task: Task> = cx.background_spawn(async move { let client = nostr_client(); + let signer = client.signer().await?; + let public_key = signer.get_public_key().await?; + let filter = Filter::new() .kind(Kind::ApplicationSpecificData) .identifier(SETTINGS_IDENTIFIER) + .author(public_key) .limit(1); if let Some(event) = client.database().query(filter).await?.first_owned() { @@ -131,44 +138,37 @@ impl AppSettings { } }); - tasks.push( - // Load settings from database - cx.spawn(async move |this, cx| { - if let Ok(settings) = task.await { - this.update(cx, |this, cx| { - this.setting_values = settings; - cx.notify(); - }) - .ok(); - } - }), - ); - - Self { - setting_values, - _subscriptions: smallvec![], - _tasks: tasks, - } + cx.spawn(async move |this, cx| { + if let Ok(settings) = task.await { + this.update(cx, |this, cx| { + this.setting_values = settings; + cx.notify(); + }) + .ok(); + } + }) + .detach(); } - pub(crate) fn set_settings(&self, cx: &mut Context) { + pub fn set_settings(&self, cx: &mut Context) { if let Ok(content) = serde_json::to_string(&self.setting_values) { - cx.background_spawn(async move { + let task: Task> = cx.background_spawn(async move { let client = nostr_client(); - let builder = EventBuilder::new(Kind::ApplicationSpecificData, content) - .tags(vec![Tag::identifier(SETTINGS_IDENTIFIER)]) - .sign(&Keys::generate()) - .await; + let signer = client.signer().await?; + let public_key = signer.get_public_key().await?; - if let Ok(event) = builder { - if let Err(e) = client.database().save_event(&event).await { - log::error!("Failed to save user settings: {e}"); - } else { - log::info!("New settings have been saved successfully"); - } - } - }) - .detach(); + let event = EventBuilder::new(Kind::ApplicationSpecificData, content) + .tag(Tag::identifier(SETTINGS_IDENTIFIER)) + .build(public_key) + .sign(&Keys::generate()) + .await?; + + client.database().save_event(&event).await?; + + Ok(()) + }); + + task.detach(); } } @@ -176,12 +176,16 @@ impl AppSettings { !self.setting_values.authenticated_relays.is_empty() && self.setting_values.auto_auth } - pub fn auth_relays(&self) -> Vec { - self.setting_values.authenticated_relays.clone() + pub fn is_authenticated_relays(&self, url: &RelayUrl) -> bool { + self.setting_values.authenticated_relays.contains(url) } - pub fn push_auth_relay(&mut self, relay_url: RelayUrl, cx: &mut Context) { - self.setting_values.authenticated_relays.push(relay_url); - cx.notify(); + pub fn push_relay(&mut self, relay_url: &RelayUrl, cx: &mut Context) { + if !self.is_authenticated_relays(relay_url) { + self.setting_values + .authenticated_relays + .push(relay_url.to_owned()); + cx.notify(); + } } }