feat: detect user dm relays when opening chat panel (#151)

* preconnect to user messaging relays

* .
This commit is contained in:
reya
2025-09-15 19:34:48 +07:00
committed by GitHub
parent cc79f0ed1c
commit d13ffd5a54
5 changed files with 217 additions and 135 deletions

View File

@@ -416,24 +416,31 @@ impl ChatSpace {
}
}
Kind::InboxRelays => {
let relays = nip17::extract_relay_list(&event).collect_vec();
if let Ok(true) = Self::is_self_event(&event).await {
let relays = nip17::extract_relay_list(&event).collect_vec();
if !relays.is_empty() {
for relay in relays.clone().into_iter() {
if client.add_relay(relay).await.is_err() {
let notice = Notice::RelayFailed(relay.clone());
css.signal.send(SignalKind::Notice(notice)).await;
}
if client.connect_relay(relay).await.is_err() {
let notice = Notice::RelayFailed(relay.clone());
css.signal.send(SignalKind::Notice(notice)).await;
if !relays.is_empty() {
for relay in relays.clone().into_iter() {
if client.add_relay(relay).await.is_err() {
let notice = Notice::RelayFailed(relay.clone());
css.signal.send(SignalKind::Notice(notice)).await;
}
if client.connect_relay(relay).await.is_err() {
let notice = Notice::RelayFailed(relay.clone());
css.signal.send(SignalKind::Notice(notice)).await;
}
}
// Subscribe to gift wrap events only in the current user's NIP-17 relays
Self::fetch_gift_wrap(relays, event.pubkey).await;
} else {
css.signal.send(SignalKind::RelaysNotFound).await;
}
// Subscribe to gift wrap events only in the current user's NIP-17 relays
Self::fetch_gift_wrap(relays, event.pubkey).await;
} else {
css.signal.send(SignalKind::RelaysNotFound).await;
for (id, relays) in client.subscriptions().await {
log::info!("sub id: {id:?}");
log::info!("relays: {relays:?}");
}
}
}
Kind::ContactList => {

View File

@@ -1,3 +1,5 @@
use std::collections::HashMap;
use anyhow::anyhow;
use common::display::{ReadableProfile, ReadableTimestamp};
use common::nip96::nip96_upload;
@@ -49,6 +51,9 @@ pub fn init(room: Entity<Room>, window: &mut Window, cx: &mut App) -> Entity<Cha
pub struct Chat {
// Chat Room
room: Entity<Room>,
relays: Entity<HashMap<PublicKey, Vec<RelayUrl>>>,
// Messages
list_state: ListState,
messages: BTreeSet<Message>,
rendered_texts_by_id: BTreeMap<EventId, RenderedText>,
@@ -67,14 +72,20 @@ pub struct Chat {
focus_handle: FocusHandle,
image_cache: Entity<RetainAllImageCache>,
_subscriptions: SmallVec<[Subscription; 2]>,
_tasks: SmallVec<[Task<()>; 1]>,
_subscriptions: SmallVec<[Subscription; 4]>,
_tasks: SmallVec<[Task<()>; 2]>,
}
impl Chat {
pub fn new(room: Entity<Room>, window: &mut Window, cx: &mut Context<Self>) -> Self {
let attachments = cx.new(|_| vec![]);
let replies_to = cx.new(|_| vec![]);
let relays = cx.new(|_| {
let this: HashMap<PublicKey, Vec<RelayUrl>> = HashMap::new();
this
});
let input = cx.new(|cx| {
InputState::new(window, cx)
.placeholder(t!("chat.placeholder"))
@@ -89,11 +100,35 @@ impl Chat {
let messages = BTreeSet::from([Message::system()]);
let list_state = ListState::new(messages.len(), ListAlignment::Bottom, px(1024.));
let connect_relays = room.read(cx).connect_relays(cx);
let load_messages = room.read(cx).load_messages(cx);
let mut subscriptions = smallvec![];
let mut tasks = smallvec![];
tasks.push(
// Load all messages belonging to this room
cx.spawn_in(window, async move |this, cx| {
match connect_relays.await {
Ok(relays) => {
this.update(cx, |this, cx| {
this.relays.update(cx, |this, cx| {
*this = relays;
cx.notify();
});
})
.ok();
}
Err(e) => {
cx.update(|window, cx| {
window.push_notification(e.to_string(), cx);
})
.ok();
}
};
}),
);
tasks.push(
// Load all messages belonging to this room
cx.spawn_in(window, async move |this, cx| {
@@ -139,7 +174,7 @@ impl Chat {
match signal {
RoomSignal::NewMessage((gift_wrap_id, event)) => {
if !this.is_sent_by_coop(gift_wrap_id) {
this.insert_message(event, false, cx);
this.insert_message(Message::user(event), false, cx);
}
}
RoomSignal::Refresh => {
@@ -149,6 +184,33 @@ impl Chat {
}),
);
subscriptions.push(
// Observe the messaging relays of the room's members
cx.observe_in(&relays, window, |this, entity, _window, cx| {
for (public_key, urls) in entity.read(cx).clone().into_iter() {
if urls.is_empty() {
let profile = Registry::read_global(cx).get_person(&public_key, cx);
let content = t!("chat.nip17_not_found", u = profile.name());
this.insert_warning(content, cx);
}
}
}),
);
subscriptions.push(
// Observe when user close chat panel
cx.on_release_in(window, move |this, window, cx| {
this.disconnect_relays(cx);
this.messages.clear();
this.rendered_texts_by_id.clear();
this.reports_by_id.clear();
this.image_cache.update(cx, |this, cx| {
this.clear(window, cx);
});
}),
);
Self {
id: room.read(cx).id.to_string().into(),
image_cache: RetainAllImageCache::new(cx),
@@ -156,6 +218,7 @@ impl Chat {
uploading: false,
rendered_texts_by_id: BTreeMap::new(),
reports_by_id: BTreeMap::new(),
relays,
messages,
room,
list_state,
@@ -167,6 +230,20 @@ impl Chat {
}
}
/// Disconnect all relays when the user closes the chat panel
fn disconnect_relays(&mut self, cx: &mut App) {
let relays = self.relays.read(cx).clone();
cx.background_spawn(async move {
let client = nostr_client();
for relay in relays.values().flatten() {
client.disconnect_relay(relay).await.ok();
}
})
.detach();
}
/// Load all messages belonging to this room
fn load_messages(&mut self, window: &mut Window, cx: &mut Context<Self>) {
let load_messages = self.room.read(cx).load_messages(cx);
@@ -260,7 +337,7 @@ impl Chat {
cx.defer_in(window, |this, window, cx| {
// Optimistically update message list
this.insert_message(temp_message, true, cx);
this.insert_message(Message::user(temp_message), true, cx);
// Remove all replies
this.remove_all_replies(cx);
@@ -339,6 +416,41 @@ impl Chat {
}
}
/// Insert a message into the chat panel
fn insert_message<E>(&mut self, m: E, scroll: bool, cx: &mut Context<Self>)
where
E: Into<Message>,
{
let old_len = self.messages.len();
// Extend the messages list with the new events
if self.messages.insert(m.into()) {
self.list_state.splice(old_len..old_len, 1);
if scroll {
self.list_state.scroll_to(ListOffset {
item_ix: self.list_state.item_count(),
offset_in_item: px(0.0),
});
cx.notify();
}
}
}
/// Convert and insert a vector of nostr events into the chat panel
fn insert_messages(&mut self, events: Vec<Event>, cx: &mut Context<Self>) {
for event in events.into_iter() {
let m = Message::user(event);
self.insert_message(m, false, cx);
}
cx.notify();
}
fn insert_warning(&mut self, content: impl Into<String>, cx: &mut Context<Self>) {
let m = Message::warning(content.into());
self.insert_message(m, true, cx);
}
/// Check if a message failed to send by its ID
fn is_sent_failed(&self, id: &EventId) -> bool {
self.reports_by_id
@@ -370,35 +482,6 @@ impl Chat {
})
}
/// Convert and insert a nostr event into the chat panel
fn insert_message<E>(&mut self, event: E, scroll: bool, cx: &mut Context<Self>)
where
E: Into<RenderedMessage>,
{
let old_len = self.messages.len();
// Extend the messages list with the new events
if self.messages.insert(Message::user(event)) {
self.list_state.splice(old_len..old_len, 1);
if scroll {
self.list_state.scroll_to(ListOffset {
item_ix: self.list_state.item_count(),
offset_in_item: px(0.0),
});
cx.notify();
}
}
}
/// Convert and insert a vector of nostr events into the chat panel
fn insert_messages(&mut self, events: Vec<Event>, cx: &mut Context<Self>) {
for event in events.into_iter() {
self.insert_message(event, false, cx);
}
cx.notify();
}
fn profile(&self, public_key: &PublicKey, cx: &Context<Self>) -> Profile {
let registry = Registry::read_global(cx);
registry.get_person(public_key, cx)
@@ -557,6 +640,23 @@ impl Chat {
.into_any_element()
}
fn render_warning(&mut self, ix: usize, content: String, cx: &mut Context<Self>) -> AnyElement {
div()
.id(ix)
.w_full()
.py_1()
.px_3()
.child(
h_flex()
.gap_3()
.text_sm()
.text_color(cx.theme().warning_foreground)
.child(Avatar::new("brand/avatar.png").size(rems(2.)))
.child(SharedString::from(content)),
)
.into_any_element()
}
fn render_message_not_found(&self, ix: usize, cx: &Context<Self>) -> AnyElement {
div()
.id(ix)
@@ -604,8 +704,7 @@ impl Chat {
.py_1()
.px_3()
.child(
div()
.flex()
h_flex()
.gap_3()
.when(!hide_avatar, |this| {
this.child(Avatar::new(author.avatar_url(proxy)).size(rems(2.)))
@@ -617,9 +716,7 @@ impl Chat {
.flex_initial()
.overflow_hidden()
.child(
div()
.flex()
.items_center()
h_flex()
.gap_2()
.text_sm()
.text_color(cx.theme().text_placeholder)
@@ -1267,6 +1364,9 @@ impl Render for Chat {
this.render_message(ix, rendered, text, cx)
}
Message::Warning(content, _) => {
this.render_warning(ix, content.to_owned(), cx)
}
Message::System(_) => this.render_announcement(ix, cx),
}
} else {

View File

@@ -29,10 +29,9 @@ pub struct Screening {
profile: Profile,
verified: bool,
followed: bool,
dm_relays: Option<bool>,
last_active: Option<Timestamp>,
mutual_contacts: Vec<Profile>,
_tasks: SmallVec<[Task<()>; 4]>,
_tasks: SmallVec<[Task<()>; 3]>,
}
impl Screening {
@@ -83,24 +82,6 @@ impl Screening {
activity
});
let relay_check = cx.background_spawn(async move {
let client = nostr_client();
let mut relay = false;
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
if let Ok(mut stream) = client.stream_events(filter, Duration::from_secs(2)).await {
while stream.next().await.is_some() {
relay = true
}
}
relay
});
let addr_check = if let Some(address) = profile.metadata().nip05 {
Some(Tokio::spawn(cx, async move {
nip05_verify(public_key, &address).await.unwrap_or(false)
@@ -136,19 +117,6 @@ impl Screening {
}),
);
tasks.push(
// Run the relay check in the background
cx.spawn_in(window, async move |this, cx| {
let relay = relay_check.await;
this.update(cx, |this, cx| {
this.dm_relays = Some(relay);
cx.notify();
})
.ok();
}),
);
tasks.push(
// Run the NIP-05 verification in the background
cx.spawn_in(window, async move |this, cx| {
@@ -168,7 +136,6 @@ impl Screening {
profile,
verified: false,
followed: false,
dm_relays: None,
last_active: None,
mutual_contacts: vec![],
_tasks: tasks,
@@ -456,37 +423,6 @@ impl Render for Screening {
}),
),
),
)
.child(
h_flex()
.items_start()
.gap_2()
.child(status_badge(self.dm_relays, cx))
.child(
v_flex()
.w_full()
.text_sm()
.child({
if self.dm_relays == Some(true) {
shared_t!("screening.relay_found")
} else {
shared_t!("screening.relay_empty")
}
})
.child(
div()
.w_full()
.line_clamp(1)
.text_color(cx.theme().text_muted)
.child({
if self.dm_relays == Some(true) {
shared_t!("screening.relay_found_desc")
} else {
shared_t!("screening.relay_empty_desc")
}
}),
),
),
),
)
}

View File

@@ -5,6 +5,7 @@ use nostr_sdk::prelude::*;
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum Message {
User(RenderedMessage),
Warning(String, Timestamp),
System(Timestamp),
}
@@ -13,6 +14,10 @@ impl Message {
Self::User(user.into())
}
pub fn warning(content: String) -> Self {
Self::Warning(content, Timestamp::now())
}
pub fn system() -> Self {
Self::System(Timestamp::default())
}
@@ -25,6 +30,11 @@ impl Ord for Message {
(Message::System(a), Message::System(b)) => a.cmp(b),
(Message::User(a), Message::System(b)) => a.created_at.cmp(b),
(Message::System(a), Message::User(b)) => a.cmp(&b.created_at),
(Message::Warning(_, a), Message::Warning(_, b)) => a.cmp(b),
(Message::Warning(_, a), Message::User(b)) => a.cmp(&b.created_at),
(Message::User(a), Message::Warning(_, b)) => a.created_at.cmp(b),
(Message::Warning(_, a), Message::System(b)) => a.cmp(b),
(Message::System(a), Message::Warning(_, b)) => a.cmp(b),
}
}
}

View File

@@ -331,15 +331,44 @@ impl Room {
}
}
/// Connects to all members' messaging relays
pub fn connect_relays(
&self,
cx: &App,
) -> Task<Result<HashMap<PublicKey, Vec<RelayUrl>>, Error>> {
let members = self.members.clone();
cx.background_spawn(async move {
let client = nostr_client();
let timeout = Duration::from_secs(3);
let mut processed = HashSet::new();
let mut relays: HashMap<PublicKey, Vec<RelayUrl>> = HashMap::new();
if let Some((_, members)) = members.split_last() {
for member in members.iter() {
relays.insert(member.to_owned(), vec![]);
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(member.to_owned())
.limit(1);
if let Ok(mut stream) = client.stream_events(filter, timeout).await {
if let Some(event) = stream.next().await {
if processed.insert(event.id) {
let urls = nip17::extract_owned_relay_list(event).collect_vec();
relays.entry(member.to_owned()).or_default().extend(urls);
}
}
}
}
};
Ok(relays)
})
}
/// Loads all messages for this room from the database
///
/// # Arguments
///
/// * `cx` - The App context
///
/// # Returns
///
/// A Task that resolves to Result<Vec<Event>, Error> containing all messages for this room
pub fn load_messages(&self, cx: &App) -> Task<Result<Vec<Event>, Error>> {
let members = self.members.clone();
@@ -365,16 +394,6 @@ impl Room {
})
}
/// Emits a new message signal to the current room
pub fn emit_message(&self, gift_wrap_id: EventId, event: Event, cx: &mut Context<Self>) {
cx.emit(RoomSignal::NewMessage((gift_wrap_id, Box::new(event))));
}
/// Emits a signal to refresh the current room's messages.
pub fn emit_refresh(&mut self, cx: &mut Context<Self>) {
cx.emit(RoomSignal::Refresh);
}
/// Creates a temporary message for optimistic updates
///
/// The event must not been published to relays.
@@ -593,4 +612,14 @@ impl Room {
Ok(resend_reports)
})
}
/// Emits a new message signal to the current room
pub fn emit_message(&self, gift_wrap_id: EventId, event: Event, cx: &mut Context<Self>) {
cx.emit(RoomSignal::NewMessage((gift_wrap_id, Box::new(event))));
}
/// Emits a signal to refresh the current room's messages.
pub fn emit_refresh(&mut self, cx: &mut Context<Self>) {
cx.emit(RoomSignal::Refresh);
}
}