From 2ea2519e8b0e1ffa8f75e36c35d2c25fd4f6a904 Mon Sep 17 00:00:00 2001 From: reya <123083837+reyamir@users.noreply.github.com> Date: Fri, 12 Sep 2025 17:07:57 +0700 Subject: [PATCH] feat: resend failed messages (#147) * . * . * fix * fix * update * fix * . * . --- crates/coop/src/chatspace.rs | 63 +++++----- crates/coop/src/views/chat/mod.rs | 153 +++++++++++++++--------- crates/global/src/constants.rs | 2 +- crates/global/src/lib.rs | 6 +- crates/registry/src/room.rs | 192 ++++++++++++++++++++---------- locales/app.yml | 2 + 6 files changed, 261 insertions(+), 157 deletions(-) diff --git a/crates/coop/src/chatspace.rs b/crates/coop/src/chatspace.rs index 5c76ca3..ae8ed9b 100644 --- a/crates/coop/src/chatspace.rs +++ b/crates/coop/src/chatspace.rs @@ -64,18 +64,22 @@ pub fn new_account(window: &mut Window, cx: &mut App) { } pub struct ChatSpace { - // Workspace + // App's Title Bar title_bar: Entity, + + // App's Dock Area dock: Entity, - // Temporarily store all authentication requests - auth_requests: HashMap, + // All authentication requests + auth_requests: HashMap, // Local state to determine if the user has set up NIP-17 relays - has_nip17_relays: bool, + nip17_relays: bool, - // System + // All subscriptions for observing the app state _subscriptions: SmallVec<[Subscription; 3]>, + + // All long running tasks _tasks: SmallVec<[Task<()>; 5]>, } @@ -182,7 +186,7 @@ impl ChatSpace { dock, title_bar, auth_requests: HashMap::new(), - has_nip17_relays: true, + nip17_relays: true, _subscriptions: subscriptions, _tasks: tasks, } @@ -573,7 +577,7 @@ impl ChatSpace { } Signal::DmRelayNotFound => { view.update(cx, |this, cx| { - this.set_no_nip17_relays(cx); + this.set_required_relays(cx); }) .ok(); } @@ -728,10 +732,8 @@ impl ChatSpace { match event.created_at >= css.init_at { // New message: send a signal to notify the UI true => { - // Prevent notification if the event was sent by Coop - if !css.sent_ids.read().await.contains(&target.id) { - ingester.send(Signal::Message((target.id, event))).await; - } + smol::Timer::after(Duration::from_millis(200)).await; + ingester.send(Signal::Message((target.id, event))).await; } // Old message: Coop is probably processing the user's messages during initial load false => { @@ -942,20 +944,20 @@ impl ChatSpace { } fn reopen_auth_request(&mut self, window: &mut Window, cx: &mut Context) { - for req in self.auth_requests.clone().into_iter() { - self.open_auth_request(req.0, window, cx); + for (_, request) in self.auth_requests.clone().into_iter() { + self.open_auth_request(request, window, cx); } } fn push_auth_request(&mut self, req: &AuthRequest, cx: &mut Context) { - self.auth_requests.insert(req.to_owned(), false); + self.auth_requests.insert(req.url.clone(), req.to_owned()); cx.notify(); } fn sending_auth_request(&mut self, challenge: &str, cx: &mut Context) { - for (req, status) in self.auth_requests.iter_mut() { + for (_, req) in self.auth_requests.iter_mut() { if req.challenge == challenge { - *status = true; + req.sending = true; cx.notify(); } } @@ -965,16 +967,16 @@ impl ChatSpace { if let Some(req) = self .auth_requests .iter() - .find(|(req, _)| req.challenge == challenge) + .find(|(_, req)| req.challenge == challenge) { - req.1.to_owned() + req.1.sending } else { false } } fn remove_auth_request(&mut self, challenge: &str, cx: &mut Context) { - self.auth_requests.retain(|r, _| r.challenge != challenge); + self.auth_requests.retain(|_, r| r.challenge != challenge); cx.notify(); } @@ -1026,8 +1028,8 @@ impl ChatSpace { }); } - fn set_no_nip17_relays(&mut self, cx: &mut Context) { - self.has_nip17_relays = false; + fn set_required_relays(&mut self, cx: &mut Context) { + self.nip17_relays = false; cx.notify(); } @@ -1054,19 +1056,13 @@ impl ChatSpace { cx.spawn_in(window, async move |this, cx| { if let Ok((secret, profile)) = task.await { - cx.update(|window, cx| { - this.update(cx, |this, cx| { - this.set_account_layout(secret, profile, window, cx); - }) - .ok(); + this.update_in(cx, |this, window, cx| { + this.set_account_layout(secret, profile, window, cx); }) .ok(); } else { - cx.update(|window, cx| { - this.update(cx, |this, cx| { - this.set_onboarding_layout(window, cx); - }) - .ok(); + this.update_in(cx, |this, window, cx| { + this.set_onboarding_layout(window, cx); }) .ok(); } @@ -1283,7 +1279,6 @@ impl ChatSpace { cx: &mut Context, ) -> impl IntoElement { let proxy = AppSettings::get_proxy_user_avatars(cx); - let is_auto_auth = AppSettings::read_global(cx).is_auto_auth(); let updating = AutoUpdater::read_global(cx).status.is_updating(); let updated = AutoUpdater::read_global(cx).status.is_updated(); let auth_requests = self.auth_requests.len(); @@ -1322,7 +1317,7 @@ impl ChatSpace { }), ) }) - .when(auth_requests > 0 && !is_auto_auth, |this| { + .when(auth_requests > 0, |this| { this.child( h_flex() .id("requests") @@ -1342,7 +1337,7 @@ impl ChatSpace { })), ) }) - .when(!self.has_nip17_relays, |this| { + .when(!self.nip17_relays, |this| { this.child(setup_nip17_relay(t!("relays.button"))) }) .child( diff --git a/crates/coop/src/views/chat/mod.rs b/crates/coop/src/views/chat/mod.rs index a4d3186..1c149a5 100644 --- a/crates/coop/src/views/chat/mod.rs +++ b/crates/coop/src/views/chat/mod.rs @@ -24,7 +24,7 @@ use smallvec::{smallvec, SmallVec}; use smol::fs; use theme::ActiveTheme; use ui::avatar::Avatar; -use ui::button::{Button, ButtonVariants}; +use ui::button::{Button, ButtonRounded, ButtonVariants}; use ui::dock_area::panel::{Panel, PanelEvent}; use ui::emoji_picker::EmojiPicker; use ui::input::{InputEvent, InputState, TextInput}; @@ -307,6 +307,38 @@ impl Chat { .detach(); } + fn resend_message(&mut self, id: &EventId, window: &mut Window, cx: &mut Context) { + if let Some(reports) = self.reports_by_id.get(id).cloned() { + if let Some(message) = self.message(id) { + let backup = AppSettings::get_backup_messages(cx); + let id_clone = id.to_owned(); + let message = message.content.to_owned(); + let task = self.room.read(cx).resend(reports, message, backup, cx); + + cx.spawn_in(window, async move |this, cx| { + match task.await { + Ok(reports) => { + this.update(cx, |this, cx| { + this.reports_by_id.entry(id_clone).and_modify(|this| { + *this = reports; + }); + cx.notify(); + }) + .ok(); + } + Err(e) => { + cx.update(|window, cx| { + window.push_notification(e.to_string(), cx); + }) + .ok(); + } + }; + }) + .detach(); + } + } + } + /// Check if a message failed to send by its ID fn is_sent_failed(&self, id: &EventId) -> bool { self.reports_by_id @@ -609,7 +641,23 @@ impl Chat { }) .child(text) .when(is_sent_failed, |this| { - this.child(self.render_message_reports(&id, cx)) + this.child( + h_flex() + .gap_1() + .child(self.render_message_reports(&id, cx)) + .child( + Button::new(SharedString::from(id.to_hex())) + .label(t!("common.resend")) + .danger() + .xsmall() + .rounded(ButtonRounded::Full) + .on_click(cx.listener( + move |this, _, window, cx| { + this.resend_message(&id, window, cx); + }, + )), + ), + ) }), ), ) @@ -677,15 +725,17 @@ impl Chat { } fn render_message_sent(&self, id: &EventId, _cx: &Context) -> impl IntoElement { - div().id("").child(shared_t!("chat.sent")).when_some( - self.sent_reports(id).cloned(), - |this, reports| { + div() + .id(SharedString::from(id.to_hex())) + .child(shared_t!("chat.sent")) + .when_some(self.sent_reports(id).cloned(), |this, reports| { this.on_click(move |_e, window, cx| { let reports = reports.clone(); window.open_modal(cx, move |this, _window, cx| { - this.title(shared_t!("chat.reports")).child( - v_flex().pb_4().gap_4().children({ + this.show_close(true) + .title(shared_t!("chat.reports")) + .child(v_flex().pb_4().gap_4().children({ let mut items = Vec::with_capacity(reports.len()); for report in reports.iter() { @@ -693,30 +743,29 @@ impl Chat { } items - }), - ) + })) }); }) - }, - ) + }) } fn render_message_reports(&self, id: &EventId, cx: &Context) -> impl IntoElement { h_flex() - .id("") - .gap_1() + .id(SharedString::from(id.to_hex())) + .gap_0p5() .text_color(cx.theme().danger_foreground) .text_xs() .italic() - .child(Icon::new(IconName::Info).small()) + .child(Icon::new(IconName::Info).xsmall()) .child(shared_t!("chat.sent_failed")) .when_some(self.sent_reports(id).cloned(), |this, reports| { this.on_click(move |_e, window, cx| { let reports = reports.clone(); window.open_modal(cx, move |this, _window, cx| { - this.title(shared_t!("chat.reports")).child( - v_flex().pb_4().gap_4().children({ + this.show_close(true) + .title(shared_t!("chat.reports")) + .child(v_flex().gap_4().pb_4().w_full().children({ let mut items = Vec::with_capacity(reports.len()); for report in reports.iter() { @@ -724,8 +773,7 @@ impl Chat { } items - }), - ) + })) }); }) }) @@ -739,6 +787,7 @@ impl Chat { v_flex() .gap_2() + .w_full() .child( h_flex() .gap_2() @@ -752,7 +801,7 @@ impl Chat { .child(name.clone()), ), ) - .when(report.nip17_relays_not_found, |this| { + .when(report.relays_not_found, |this| { this.child( h_flex() .flex_wrap() @@ -773,7 +822,7 @@ impl Chat { ), ) }) - .when_some(report.local_error.clone(), |this, error| { + .when_some(report.error.clone(), |this, error| { this.child( h_flex() .flex_wrap() @@ -788,38 +837,36 @@ impl Chat { .child(div().flex_1().w_full().text_center().child(error)), ) }) - .when_some(report.output.clone(), |this, output| { + .when_some(report.status.clone(), |this, output| { this.child( v_flex() .gap_2() - .text_xs() + .w_full() .children({ let mut items = Vec::with_capacity(output.failed.len()); for (url, msg) in output.failed.into_iter() { items.push( - h_flex() - .gap_1() - .justify_between() - .text_sm() + v_flex() + .gap_0p5() + .py_1() + .px_2() + .w_full() + .rounded(cx.theme().radius) + .bg(cx.theme().elevated_surface_background) .child( div() - .flex_1() - .py_0p5() - .px_2() - .bg(cx.theme().elevated_surface_background) - .rounded_sm() - .child(url.to_string()), + .text_xs() + .font_semibold() + .line_height(relative(1.25)) + .child(SharedString::from(url.to_string())), ) .child( div() - .flex_1() - .py_0p5() - .px_2() - .bg(cx.theme().danger_background) + .text_sm() .text_color(cx.theme().danger_foreground) - .rounded_sm() - .child(msg.to_string()), + .line_height(relative(1.25)) + .child(SharedString::from(msg.to_string())), ), ) } @@ -831,27 +878,25 @@ impl Chat { for url in output.success.into_iter() { items.push( - h_flex() - .gap_1() - .justify_between() - .text_sm() + v_flex() + .gap_0p5() + .py_1() + .px_2() + .w_full() + .rounded(cx.theme().radius) + .bg(cx.theme().elevated_surface_background) .child( div() - .flex_1() - .py_0p5() - .px_2() - .bg(cx.theme().elevated_surface_background) - .rounded_sm() - .child(url.to_string()), + .text_xs() + .font_semibold() + .line_height(relative(1.25)) + .child(SharedString::from(url.to_string())), ) .child( div() - .flex_1() - .py_0p5() - .px_2() - .bg(cx.theme().secondary_background) + .text_sm() .text_color(cx.theme().secondary_foreground) - .rounded_sm() + .line_height(relative(1.25)) .child(shared_t!("chat.sent_success")), ), ) @@ -921,7 +966,7 @@ impl Chat { let path: SharedString = url.to_string().into(); div() - .id("") + .id(SharedString::from(url.to_string())) .relative() .w_16() .child( diff --git a/crates/global/src/constants.rs b/crates/global/src/constants.rs index 668ba55..e59eb47 100644 --- a/crates/global/src/constants.rs +++ b/crates/global/src/constants.rs @@ -37,7 +37,7 @@ pub const NOSTR_CONNECT_RELAY: &str = "wss://relay.nsec.app"; pub const RELAY_RETRY: u64 = 2; /// Default retry count for sending messages -pub const SEND_RETRY: u64 = 5; +pub const SEND_RETRY: u64 = 10; /// Default timeout (in seconds) for Nostr Connect pub const NOSTR_CONNECT_TIMEOUT: u64 = 200; diff --git a/crates/global/src/lib.rs b/crates/global/src/lib.rs index efaecec..1253adb 100644 --- a/crates/global/src/lib.rs +++ b/crates/global/src/lib.rs @@ -13,16 +13,18 @@ use crate::paths::support_dir; pub mod constants; pub mod paths; -#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct AuthRequest { - pub challenge: String, pub url: RelayUrl, + pub challenge: String, + pub sending: bool, } impl AuthRequest { pub fn new(challenge: impl Into, url: RelayUrl) -> Self { Self { challenge: challenge.into(), + sending: false, url, } } diff --git a/crates/registry/src/room.rs b/crates/registry/src/room.rs index 54d2c10..1c58fd4 100644 --- a/crates/registry/src/room.rs +++ b/crates/registry/src/room.rs @@ -1,4 +1,5 @@ use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::time::Duration; @@ -16,45 +17,51 @@ use crate::Registry; #[derive(Debug, Clone)] pub struct SendReport { pub receiver: PublicKey, - pub output: Option>, - pub local_error: Option, - pub nip17_relays_not_found: bool, + pub tags: Option>, + pub status: Option>, + pub error: Option, + pub relays_not_found: bool, } impl SendReport { - pub fn output(receiver: PublicKey, output: Output) -> Self { + pub fn new(receiver: PublicKey) -> Self { Self { receiver, - output: Some(output), - local_error: None, - nip17_relays_not_found: false, + status: None, + error: None, + tags: None, + relays_not_found: false, } } - pub fn error(receiver: PublicKey, error: impl Into) -> Self { - Self { - receiver, - output: None, - local_error: Some(error.into()), - nip17_relays_not_found: false, - } + pub fn not_found(mut self) -> Self { + self.relays_not_found = true; + self } - pub fn nip17_relays_not_found(receiver: PublicKey) -> Self { - Self { - receiver, - output: None, - local_error: None, - nip17_relays_not_found: true, - } + pub fn error(mut self, error: impl Into) -> Self { + self.error = Some(error.into()); + self.relays_not_found = false; + self + } + + pub fn status(mut self, output: Output) -> Self { + self.status = Some(output); + self.relays_not_found = false; + self + } + + pub fn tags(mut self, tags: &Vec) -> Self { + self.tags = Some(tags.to_owned()); + self } pub fn is_relay_error(&self) -> bool { - self.local_error.is_some() || self.nip17_relays_not_found + self.error.is_some() || self.relays_not_found } pub fn is_sent_success(&self) -> bool { - if let Some(output) = self.output.as_ref() { + if let Some(output) = self.status.as_ref() { !output.success.is_empty() } else { false @@ -338,19 +345,21 @@ impl Room { cx.background_spawn(async move { let client = nostr_client(); + let public_key = members[members.len() - 1]; - let filter = Filter::new() + let sent = Filter::new() .kind(Kind::PrivateDirectMessage) - .authors(members.clone()) + .author(public_key) .pubkeys(members.clone()); - let events: Vec = client - .database() - .query(filter) - .await? - .into_iter() - .filter(|ev| ev.compare_pubkeys(&members)) - .collect(); + let recv = Filter::new() + .kind(Kind::PrivateDirectMessage) + .authors(members) + .pubkey(public_key); + + let sent_events = client.database().query(sent).await?; + let recv_events = client.database().query(recv).await?; + let events: Vec = sent_events.merge(recv_events).into_iter().collect(); Ok(events) }) @@ -398,17 +407,7 @@ impl Room { event } - /// Sends a message to all members in the background task - /// - /// # Arguments - /// - /// * `content` - The content of the message to send - /// * `cx` - The App context - /// - /// # Returns - /// - /// A Task that resolves to Result, Error> where the - /// strings contain error messages for any failed sends + /// Create a task to sends a message to all members in the background pub fn send_in_background( &self, content: &str, @@ -422,20 +421,21 @@ impl Room { let mut public_keys = self.members.clone(); cx.background_spawn(async move { + let css = css(); let client = nostr_client(); let signer = client.signer().await?; let public_key = signer.get_public_key().await?; - let mut tags = public_keys + let mut tags: Vec = public_keys .iter() - .filter_map(|pubkey| { - if pubkey != &public_key { - Some(Tag::public_key(*pubkey)) + .filter_map(|&this| { + if this != public_key { + Some(Tag::public_key(this)) } else { None } }) - .collect_vec(); + .collect(); // Add event reference if it's present (replying to another event) if replies.len() == 1 { @@ -468,44 +468,50 @@ impl Room { // Stored all send errors let mut reports = vec![]; - for receiver in public_keys.into_iter() { + for pubkey in public_keys.into_iter() { match client - .send_private_msg(receiver, &content, tags.clone()) + .send_private_msg(pubkey, &content, tags.clone()) .await { Ok(output) => { - if output - .failed - .iter() - .any(|(_, msg)| msg.starts_with("auth-required:")) - { - let id = output.id(); + let id = output.id().to_owned(); + let auth_required = output.failed.iter().any(|m| m.1.starts_with("auth-")); + let report = SendReport::new(pubkey).status(output).tags(&tags); + if auth_required { // Wait for authenticated and resent event successfully for attempt in 0..=SEND_RETRY { // Check if event was successfully resent - if let Some(output) = - css().resent_ids.read().await.iter().find(|o| o.id() == id) + if let Some(output) = css + .resent_ids + .read() + .await + .iter() + .find(|e| e.id() == &id) + .cloned() { - reports.push(SendReport::output(receiver, output.to_owned())); + let output = SendReport::new(pubkey).status(output).tags(&tags); + reports.push(output); break; } + // Check if retry limit exceeded if attempt == SEND_RETRY { + reports.push(report); break; } - smol::Timer::after(Duration::from_secs(1)).await; + smol::Timer::after(Duration::from_millis(1200)).await; } } else { - reports.push(SendReport::output(receiver, output)); + reports.push(report); } } Err(e) => { if let nostr_sdk::client::Error::PrivateMsgRelaysNotFound = e { - reports.push(SendReport::nip17_relays_not_found(receiver)); + reports.push(SendReport::new(pubkey).not_found().tags(&tags)); } else { - reports.push(SendReport::error(receiver, e.to_string())); + reports.push(SendReport::new(pubkey).error(e.to_string()).tags(&tags)); } } } @@ -518,13 +524,14 @@ impl Room { .await { Ok(output) => { - reports.push(SendReport::output(public_key, output)); + reports.push(SendReport::new(public_key).status(output).tags(&tags)); } Err(e) => { if let nostr_sdk::client::Error::PrivateMsgRelaysNotFound = e { - reports.push(SendReport::nip17_relays_not_found(public_key)); + reports.push(SendReport::new(public_key).not_found()); } else { - reports.push(SendReport::error(public_key, e.to_string())); + reports + .push(SendReport::new(public_key).error(e.to_string()).tags(&tags)); } } } @@ -533,4 +540,57 @@ impl Room { Ok(reports) }) } + + /// Create a task to resend a failed message + pub fn resend( + &self, + reports: Vec, + message: String, + backup: bool, + cx: &App, + ) -> Task, Error>> { + cx.background_spawn(async move { + let client = nostr_client(); + let mut resend_reports = vec![]; + let mut resend_tag = vec![]; + + for report in reports.into_iter() { + if let Some(output) = report.status { + let id = output.id(); + let urls: Vec<&RelayUrl> = output.failed.keys().collect(); + + if let Some(event) = client.database().event_by_id(id).await? { + for url in urls.into_iter() { + let relay = client.pool().relay(url).await?; + let id = relay.send_event(&event).await?; + let resent: Output = Output { + val: id, + success: HashSet::from([url.to_owned()]), + failed: HashMap::new(), + }; + + resend_reports.push(SendReport::new(report.receiver).status(resent)); + } + + if let Some(tags) = report.tags { + resend_tag.extend(tags); + } + } + } + } + + // Only send a backup message to current user if sent successfully to others + if backup && !resend_reports.is_empty() { + let signer = client.signer().await?; + let public_key = signer.get_public_key().await?; + let output = client + .send_private_msg(public_key, message, resend_tag) + .await?; + + resend_reports.push(SendReport::new(public_key).status(output)); + } + + Ok(resend_reports) + }) + } } diff --git a/locales/app.yml b/locales/app.yml index efb38f3..b743289 100644 --- a/locales/app.yml +++ b/locales/app.yml @@ -49,6 +49,8 @@ common: en: "Relay URL is not valid." recommended: en: "Recommended:" + resend: + en: "Resend" auto_update: updating: