feat: resend failed messages (#147)

* .

* .

* fix

* fix

* update

* fix

* .

* .
This commit is contained in:
reya
2025-09-12 17:07:57 +07:00
committed by GitHub
parent 2ea5feaf4b
commit 2ea2519e8b
6 changed files with 261 additions and 157 deletions

View File

@@ -1,4 +1,5 @@
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::time::Duration;
@@ -16,45 +17,51 @@ use crate::Registry;
#[derive(Debug, Clone)]
pub struct SendReport {
pub receiver: PublicKey,
pub output: Option<Output<EventId>>,
pub local_error: Option<SharedString>,
pub nip17_relays_not_found: bool,
pub tags: Option<Vec<Tag>>,
pub status: Option<Output<EventId>>,
pub error: Option<SharedString>,
pub relays_not_found: bool,
}
impl SendReport {
pub fn output(receiver: PublicKey, output: Output<EventId>) -> Self {
pub fn new(receiver: PublicKey) -> Self {
Self {
receiver,
output: Some(output),
local_error: None,
nip17_relays_not_found: false,
status: None,
error: None,
tags: None,
relays_not_found: false,
}
}
pub fn error(receiver: PublicKey, error: impl Into<SharedString>) -> Self {
Self {
receiver,
output: None,
local_error: Some(error.into()),
nip17_relays_not_found: false,
}
pub fn not_found(mut self) -> Self {
self.relays_not_found = true;
self
}
pub fn nip17_relays_not_found(receiver: PublicKey) -> Self {
Self {
receiver,
output: None,
local_error: None,
nip17_relays_not_found: true,
}
pub fn error(mut self, error: impl Into<SharedString>) -> Self {
self.error = Some(error.into());
self.relays_not_found = false;
self
}
pub fn status(mut self, output: Output<EventId>) -> Self {
self.status = Some(output);
self.relays_not_found = false;
self
}
pub fn tags(mut self, tags: &Vec<Tag>) -> Self {
self.tags = Some(tags.to_owned());
self
}
pub fn is_relay_error(&self) -> bool {
self.local_error.is_some() || self.nip17_relays_not_found
self.error.is_some() || self.relays_not_found
}
pub fn is_sent_success(&self) -> bool {
if let Some(output) = self.output.as_ref() {
if let Some(output) = self.status.as_ref() {
!output.success.is_empty()
} else {
false
@@ -338,19 +345,21 @@ impl Room {
cx.background_spawn(async move {
let client = nostr_client();
let public_key = members[members.len() - 1];
let filter = Filter::new()
let sent = Filter::new()
.kind(Kind::PrivateDirectMessage)
.authors(members.clone())
.author(public_key)
.pubkeys(members.clone());
let events: Vec<Event> = client
.database()
.query(filter)
.await?
.into_iter()
.filter(|ev| ev.compare_pubkeys(&members))
.collect();
let recv = Filter::new()
.kind(Kind::PrivateDirectMessage)
.authors(members)
.pubkey(public_key);
let sent_events = client.database().query(sent).await?;
let recv_events = client.database().query(recv).await?;
let events: Vec<Event> = sent_events.merge(recv_events).into_iter().collect();
Ok(events)
})
@@ -398,17 +407,7 @@ impl Room {
event
}
/// Sends a message to all members in the background task
///
/// # Arguments
///
/// * `content` - The content of the message to send
/// * `cx` - The App context
///
/// # Returns
///
/// A Task that resolves to Result<Vec<String>, Error> where the
/// strings contain error messages for any failed sends
/// Create a task to sends a message to all members in the background
pub fn send_in_background(
&self,
content: &str,
@@ -422,20 +421,21 @@ impl Room {
let mut public_keys = self.members.clone();
cx.background_spawn(async move {
let css = css();
let client = nostr_client();
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let mut tags = public_keys
let mut tags: Vec<Tag> = public_keys
.iter()
.filter_map(|pubkey| {
if pubkey != &public_key {
Some(Tag::public_key(*pubkey))
.filter_map(|&this| {
if this != public_key {
Some(Tag::public_key(this))
} else {
None
}
})
.collect_vec();
.collect();
// Add event reference if it's present (replying to another event)
if replies.len() == 1 {
@@ -468,44 +468,50 @@ impl Room {
// Stored all send errors
let mut reports = vec![];
for receiver in public_keys.into_iter() {
for pubkey in public_keys.into_iter() {
match client
.send_private_msg(receiver, &content, tags.clone())
.send_private_msg(pubkey, &content, tags.clone())
.await
{
Ok(output) => {
if output
.failed
.iter()
.any(|(_, msg)| msg.starts_with("auth-required:"))
{
let id = output.id();
let id = output.id().to_owned();
let auth_required = output.failed.iter().any(|m| m.1.starts_with("auth-"));
let report = SendReport::new(pubkey).status(output).tags(&tags);
if auth_required {
// Wait for authenticated and resent event successfully
for attempt in 0..=SEND_RETRY {
// Check if event was successfully resent
if let Some(output) =
css().resent_ids.read().await.iter().find(|o| o.id() == id)
if let Some(output) = css
.resent_ids
.read()
.await
.iter()
.find(|e| e.id() == &id)
.cloned()
{
reports.push(SendReport::output(receiver, output.to_owned()));
let output = SendReport::new(pubkey).status(output).tags(&tags);
reports.push(output);
break;
}
// Check if retry limit exceeded
if attempt == SEND_RETRY {
reports.push(report);
break;
}
smol::Timer::after(Duration::from_secs(1)).await;
smol::Timer::after(Duration::from_millis(1200)).await;
}
} else {
reports.push(SendReport::output(receiver, output));
reports.push(report);
}
}
Err(e) => {
if let nostr_sdk::client::Error::PrivateMsgRelaysNotFound = e {
reports.push(SendReport::nip17_relays_not_found(receiver));
reports.push(SendReport::new(pubkey).not_found().tags(&tags));
} else {
reports.push(SendReport::error(receiver, e.to_string()));
reports.push(SendReport::new(pubkey).error(e.to_string()).tags(&tags));
}
}
}
@@ -518,13 +524,14 @@ impl Room {
.await
{
Ok(output) => {
reports.push(SendReport::output(public_key, output));
reports.push(SendReport::new(public_key).status(output).tags(&tags));
}
Err(e) => {
if let nostr_sdk::client::Error::PrivateMsgRelaysNotFound = e {
reports.push(SendReport::nip17_relays_not_found(public_key));
reports.push(SendReport::new(public_key).not_found());
} else {
reports.push(SendReport::error(public_key, e.to_string()));
reports
.push(SendReport::new(public_key).error(e.to_string()).tags(&tags));
}
}
}
@@ -533,4 +540,57 @@ impl Room {
Ok(reports)
})
}
/// Create a task to resend a failed message
pub fn resend(
&self,
reports: Vec<SendReport>,
message: String,
backup: bool,
cx: &App,
) -> Task<Result<Vec<SendReport>, Error>> {
cx.background_spawn(async move {
let client = nostr_client();
let mut resend_reports = vec![];
let mut resend_tag = vec![];
for report in reports.into_iter() {
if let Some(output) = report.status {
let id = output.id();
let urls: Vec<&RelayUrl> = output.failed.keys().collect();
if let Some(event) = client.database().event_by_id(id).await? {
for url in urls.into_iter() {
let relay = client.pool().relay(url).await?;
let id = relay.send_event(&event).await?;
let resent: Output<EventId> = Output {
val: id,
success: HashSet::from([url.to_owned()]),
failed: HashMap::new(),
};
resend_reports.push(SendReport::new(report.receiver).status(resent));
}
if let Some(tags) = report.tags {
resend_tag.extend(tags);
}
}
}
}
// Only send a backup message to current user if sent successfully to others
if backup && !resend_reports.is_empty() {
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let output = client
.send_private_msg(public_key, message, resend_tag)
.await?;
resend_reports.push(SendReport::new(public_key).status(output));
}
Ok(resend_reports)
})
}
}