chore: refactor message sending (#172)

* refactor send message

* refactor resend

* fix

* refactor

* clean up
This commit is contained in:
reya
2025-09-30 08:59:38 +07:00
committed by GitHub
parent 880ba30d20
commit 0db48bc003
6 changed files with 337 additions and 322 deletions

View File

@@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::time::Duration;
use anyhow::Error;
use anyhow::{anyhow, Error};
use common::display::RenderedProfile;
use common::event::EventUtils;
use global::constants::SEND_RETRY;
@@ -17,9 +17,9 @@ use crate::Registry;
#[derive(Debug, Clone)]
pub struct SendReport {
pub receiver: PublicKey,
pub tags: Option<Vec<Tag>>,
pub status: Option<Output<EventId>>,
pub error: Option<SharedString>,
pub on_hold: Option<Event>,
pub relays_not_found: bool,
}
@@ -29,13 +29,14 @@ impl SendReport {
receiver,
status: None,
error: None,
tags: None,
on_hold: None,
relays_not_found: false,
}
}
pub fn not_found(mut self) -> Self {
self.relays_not_found = true;
pub fn status(mut self, output: Output<EventId>) -> Self {
self.status = Some(output);
self.relays_not_found = false;
self
}
@@ -45,14 +46,13 @@ impl SendReport {
self
}
pub fn status(mut self, output: Output<EventId>) -> Self {
self.status = Some(output);
self.relays_not_found = false;
pub fn on_hold(mut self, event: Event) -> Self {
self.on_hold = Some(event);
self
}
pub fn tags(mut self, tags: &Vec<Tag>) -> Self {
self.tags = Some(tags.to_owned());
pub fn not_found(mut self) -> Self {
self.relays_not_found = true;
self
}
@@ -235,7 +235,7 @@ impl Room {
self
}
/// Set the room kind to ongoing
/// Sets this room is ongoing conversation
pub fn set_ongoing(&mut self, cx: &mut Context<Self>) {
if self.kind != RoomKind::Ongoing {
self.kind = RoomKind::Ongoing;
@@ -243,29 +243,29 @@ impl Room {
}
}
/// Checks if the room is a group chat
pub fn is_group(&self) -> bool {
self.members.len() > 2
}
/// Updates the creation timestamp of the room
pub fn created_at(&mut self, created_at: impl Into<Timestamp>, cx: &mut Context<Self>) {
pub fn set_created_at(&mut self, created_at: impl Into<Timestamp>, cx: &mut Context<Self>) {
self.created_at = created_at.into();
cx.notify();
}
/// Updates the subject of the room
pub fn subject(&mut self, subject: String, cx: &mut Context<Self>) {
pub fn set_subject(&mut self, subject: String, cx: &mut Context<Self>) {
self.subject = Some(subject);
cx.notify();
}
/// Updates the picture of the room
pub fn picture(&mut self, picture: String, cx: &mut Context<Self>) {
pub fn set_picture(&mut self, picture: String, cx: &mut Context<Self>) {
self.picture = Some(picture);
cx.notify();
}
/// Checks if the room is a group chat
pub fn is_group(&self) -> bool {
self.members.len() > 2
}
/// Gets the display name for the room
pub fn display_name(&self, cx: &App) -> SharedString {
if let Some(subject) = self.subject.clone() {
@@ -289,13 +289,13 @@ impl Room {
/// Get the first member of the room.
///
/// First member is always different from the current user.
pub(crate) fn first_member(&self, cx: &App) -> Profile {
fn first_member(&self, cx: &App) -> Profile {
let registry = Registry::read_global(cx);
registry.get_person(&self.members[0], cx)
}
/// Merge the names of the first two members of the room.
pub(crate) fn merged_name(&self, cx: &App) -> SharedString {
fn merged_name(&self, cx: &App) -> SharedString {
let registry = Registry::read_global(cx);
if self.is_group() {
@@ -322,43 +322,71 @@ impl Room {
}
}
/// Connects to all members' messaging relays
pub fn connect_relays(
&self,
cx: &App,
) -> Task<Result<HashMap<PublicKey, Vec<RelayUrl>>, Error>> {
/// Connects to all members's messaging relays
pub fn connect(&self, cx: &App) -> Task<Result<HashMap<PublicKey, Vec<RelayUrl>>, Error>> {
let members = self.members.clone();
cx.background_spawn(async move {
let client = nostr_client();
let timeout = Duration::from_secs(3);
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let mut relays = HashMap::new();
let mut processed = HashSet::new();
let mut relays: HashMap<PublicKey, Vec<RelayUrl>> = HashMap::new();
if let Some((_, members)) = members.split_last() {
for member in members.iter() {
relays.insert(member.to_owned(), vec![]);
for member in members.into_iter() {
if member == public_key {
continue;
};
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(member.to_owned())
.limit(1);
relays.insert(member, vec![]);
if let Ok(mut stream) = client.stream_events(filter, timeout).await {
if let Some(event) = stream.next().await {
if processed.insert(event.id) {
let urls = nip17::extract_owned_relay_list(event).collect_vec();
relays.entry(member.to_owned()).or_default().extend(urls);
}
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(member)
.limit(1);
let mut stream = client
.stream_events(filter, Duration::from_secs(10))
.await?;
if let Some(event) = stream.next().await {
if processed.insert(event.id) {
let public_key = event.pubkey;
let urls: Vec<RelayUrl> = nip17::extract_owned_relay_list(event).collect();
// Check if at least one URL exists
if urls.is_empty() {
continue;
}
// Connect to relays
for url in urls.iter() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
relays.entry(public_key).and_modify(|v| v.extend(urls));
}
}
};
}
Ok(relays)
})
}
pub fn disconnect(&self, relays: Vec<RelayUrl>, cx: &App) -> Task<Result<(), Error>> {
cx.background_spawn(async move {
let client = nostr_client();
for relay in relays.into_iter() {
client.disconnect_relay(relay).await?;
}
Ok(())
})
}
/// Loads all messages for this room from the database
pub fn load_messages(&self, cx: &App) -> Task<Result<Vec<Event>, Error>> {
let members = self.members.clone();
@@ -415,23 +443,48 @@ impl Room {
})
}
/// Creates a temporary message for optimistic updates
///
/// The event must not been published to relays.
pub fn create_temp_message(
&self,
receiver: PublicKey,
content: &str,
replies: &[EventId],
) -> UnsignedEvent {
let builder = EventBuilder::private_msg_rumor(receiver, content);
/// Emits a new message signal to the current room
pub fn emit_message(&self, gift_wrap_id: EventId, event: Event, cx: &mut Context<Self>) {
cx.emit(RoomSignal::NewMessage((gift_wrap_id, Box::new(event))));
}
/// Emits a signal to refresh the current room's messages.
pub fn emit_refresh(&mut self, cx: &mut Context<Self>) {
cx.emit(RoomSignal::Refresh);
}
/// Create a new message event (unsigned)
pub fn create_message(&self, content: &str, replies: &[EventId], cx: &App) -> UnsignedEvent {
let public_key = Registry::read_global(cx).identity(cx).public_key();
let subject = self.subject.clone();
let picture = self.picture.clone();
let mut tags = vec![];
// Add event reference if it's present (replying to another event)
// Add receivers
//
// NOTE: current user will be removed from the list of receivers
for member in self.members.iter() {
tags.push(Tag::public_key(member.to_owned()));
}
// Add subject tag if it's present
if let Some(subject) = subject {
tags.push(Tag::from_standardized(TagStandard::Subject(
subject.to_string(),
)));
}
// Add picture tag if it's present
if let Some(picture) = picture {
tags.push(Tag::custom(TagKind::custom("picture"), vec![picture]));
}
// Add reply/quote tag
if replies.len() == 1 {
tags.push(Tag::event(replies[0]))
} else {
for id in replies.iter() {
for id in replies {
tags.push(Tag::from_standardized(TagStandard::Quote {
event_id: id.to_owned(),
relay_url: None,
@@ -440,26 +493,27 @@ impl Room {
}
}
let mut event = builder.tags(tags).build(receiver);
// Construct a direct message event
//
// WARNING: never send this event to relays
let mut event = EventBuilder::new(Kind::PrivateDirectMessage, content)
.tags(tags)
.build(public_key);
// Ensure event ID is set
// Generate event ID
event.ensure_id();
event
}
/// Create a task to sends a message to all members in the background
pub fn send_in_background(
/// Create a task to send a message to all room members
pub fn send_message(
&self,
content: &str,
replies: Vec<EventId>,
rumor: UnsignedEvent,
backup: bool,
cx: &App,
) -> Task<Result<Vec<SendReport>, Error>> {
let content = content.to_owned();
let subject = self.subject.clone();
let picture = self.picture.clone();
let mut public_keys = self.members.clone();
let mut members = self.members.clone();
cx.background_spawn(async move {
let app_state = app_state();
@@ -467,57 +521,26 @@ impl Room {
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let mut tags: Vec<Tag> = public_keys
.iter()
.filter_map(|&this| {
if this != public_key {
Some(Tag::public_key(this))
} else {
None
}
})
.collect();
// Remove the current user's public key from the list of receivers
// Current user will be handled separately
members.retain(|&pk| pk != public_key);
// Add event reference if it's present (replying to another event)
if replies.len() == 1 {
tags.push(Tag::event(replies[0]))
} else {
for id in replies.iter() {
tags.push(Tag::from_standardized(TagStandard::Quote {
event_id: id.to_owned(),
relay_url: None,
public_key: None,
}))
}
}
let mut reports: Vec<SendReport> = vec![];
// Add subject tag if it's present
if let Some(subject) = subject {
tags.push(Tag::from_standardized(TagStandard::Subject(
subject.to_string(),
)));
}
for receiver in members.into_iter() {
let rumor = rumor.clone();
let event = EventBuilder::gift_wrap(&signer, &receiver, rumor, vec![]).await?;
// Add picture tag if it's present
if let Some(picture) = picture {
tags.push(Tag::custom(TagKind::custom("picture"), vec![picture]));
}
let Ok(relay_urls) = Self::messaging_relays(receiver).await else {
reports.push(SendReport::new(receiver).not_found());
continue;
};
// Remove the current public key from the list of receivers
public_keys.retain(|&pk| pk != public_key);
// Stored all send errors
let mut reports = vec![];
for pubkey in public_keys.into_iter() {
match client
.send_private_msg(pubkey, &content, tags.clone())
.await
{
match client.send_event_to(relay_urls, &event).await {
Ok(output) => {
let id = output.id().to_owned();
let auth_required = output.failed.iter().any(|m| m.1.starts_with("auth-"));
let report = SendReport::new(pubkey).status(output).tags(&tags);
let report = SendReport::new(receiver).status(output);
if auth_required {
// Wait for authenticated and resent event successfully
@@ -526,7 +549,7 @@ impl Room {
// Check if event was successfully resent
if let Some(output) = ids.iter().find(|e| e.id() == &id).cloned() {
let output = SendReport::new(pubkey).status(output).tags(&tags);
let output = SendReport::new(receiver).status(output);
reports.push(output);
break;
}
@@ -544,33 +567,31 @@ impl Room {
}
}
Err(e) => {
if let nostr_sdk::client::Error::PrivateMsgRelaysNotFound = e {
reports.push(SendReport::new(pubkey).not_found().tags(&tags));
} else {
reports.push(SendReport::new(pubkey).error(e.to_string()).tags(&tags));
}
reports.push(SendReport::new(receiver).error(e.to_string()));
}
}
}
// Construct a gift wrap to back up to current user's owned messaging relays
let rumor = rumor.clone();
let event = EventBuilder::gift_wrap(&signer, &public_key, rumor, vec![]).await?;
// Only send a backup message to current user if sent successfully to others
if reports.iter().all(|r| r.is_sent_success()) && backup {
match client
.send_private_msg(public_key, &content, tags.clone())
.await
{
Ok(output) => {
reports.push(SendReport::new(public_key).status(output).tags(&tags));
}
Err(e) => {
if let nostr_sdk::client::Error::PrivateMsgRelaysNotFound = e {
reports.push(SendReport::new(public_key).not_found());
} else {
reports
.push(SendReport::new(public_key).error(e.to_string()).tags(&tags));
if let Ok(relay_urls) = Self::messaging_relays(public_key).await {
match client.send_event_to(relay_urls, &event).await {
Ok(output) => {
reports.push(SendReport::new(public_key).status(output));
}
Err(e) => {
reports.push(SendReport::new(public_key).error(e.to_string()));
}
}
} else {
reports.push(SendReport::new(public_key).not_found());
}
} else {
reports.push(SendReport::new(public_key).on_hold(event));
}
Ok(reports)
@@ -578,19 +599,19 @@ impl Room {
}
/// Create a task to resend a failed message
pub fn resend(
pub fn resend_message(
&self,
reports: Vec<SendReport>,
message: String,
backup: bool,
cx: &App,
) -> Task<Result<Vec<SendReport>, Error>> {
cx.background_spawn(async move {
let client = nostr_client();
let mut resend_reports = vec![];
let mut resend_tag = vec![];
for report in reports.into_iter() {
let receiver = report.receiver;
// Process failed events
if let Some(output) = report.status {
let id = output.id();
let urls: Vec<&RelayUrl> = output.failed.keys().collect();
@@ -599,44 +620,68 @@ impl Room {
for url in urls.into_iter() {
let relay = client.pool().relay(url).await?;
let id = relay.send_event(&event).await?;
let resent: Output<EventId> = Output {
val: id,
success: HashSet::from([url.to_owned()]),
failed: HashMap::new(),
};
resend_reports.push(SendReport::new(report.receiver).status(resent));
}
if let Some(tags) = report.tags {
resend_tag.extend(tags);
resend_reports.push(SendReport::new(receiver).status(resent));
}
}
}
}
// Only send a backup message to current user if sent successfully to others
if backup && !resend_reports.is_empty() {
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let output = client
.send_private_msg(public_key, message, resend_tag)
.await?;
resend_reports.push(SendReport::new(public_key).status(output));
// Process the on hold event if it exists
if let Some(event) = report.on_hold {
if let Ok(relay_urls) = Self::messaging_relays(receiver).await {
match client.send_event_to(relay_urls, &event).await {
Ok(output) => {
resend_reports.push(SendReport::new(receiver).status(output));
}
Err(e) => {
resend_reports.push(SendReport::new(receiver).error(e.to_string()));
}
}
} else {
resend_reports.push(SendReport::new(receiver).not_found());
}
}
}
Ok(resend_reports)
})
}
/// Emits a new message signal to the current room
pub fn emit_message(&self, gift_wrap_id: EventId, event: Event, cx: &mut Context<Self>) {
cx.emit(RoomSignal::NewMessage((gift_wrap_id, Box::new(event))));
}
/// Gets messaging relays for public key
async fn messaging_relays(public_key: PublicKey) -> Result<Vec<RelayUrl>, Error> {
let client = nostr_client();
let mut relay_urls = vec![];
/// Emits a signal to refresh the current room's messages.
pub fn emit_refresh(&mut self, cx: &mut Context<Self>) {
cx.emit(RoomSignal::Refresh);
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
if let Some(event) = client.database().query(filter).await?.first_owned() {
let urls: Vec<RelayUrl> = nip17::extract_owned_relay_list(event).collect();
// Check if at least one URL exists
if urls.is_empty() {
return Err(anyhow!("Not found"));
}
// Connect to relays
for url in urls.iter() {
client.add_relay(url).await?;
client.connect_relay(url).await?;
}
relay_urls.extend(urls);
} else {
return Err(anyhow!("Not found"));
}
Ok(relay_urls)
}
}