This commit is contained in:
2026-06-05 12:16:13 +07:00
parent a0d76e2cf4
commit d53e9d538c
5 changed files with 101 additions and 179 deletions

View File

@@ -7,7 +7,6 @@ use std::time::Duration;
use anyhow::{Context as AnyhowContext, Error, anyhow};
use common::EventExt;
use device::{DeviceEvent, DeviceRegistry};
use fuzzy_matcher::FuzzyMatcher;
use fuzzy_matcher::skim::SkimMatcherV2;
use gpui::{
@@ -49,6 +48,10 @@ pub enum ChatEvent {
/// Channel signal.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum Signal {
/// Inbox Relays found, the app is ready to subscribe messages
InboxReady(Box<Event>),
/// No Inbox Relays found, the app is not ready to subscribe messages
InboxRelayNotFound,
/// Message received from relay pool
Message(NewMessage),
/// Eose received from relay pool
@@ -62,6 +65,14 @@ impl Signal {
Self::Message(NewMessage::new(gift_wrap, rumor))
}
pub fn inbox_ready(event: &Event) -> Self {
Self::InboxReady(Box::new(event.to_owned()))
}
pub fn inbox_relay_not_found() -> Self {
Self::InboxRelayNotFound
}
pub fn eose() -> Self {
Self::Eose
}
@@ -80,9 +91,6 @@ type GiftWrapId = EventId;
/// Chat Registry
#[derive(Debug)]
pub struct ChatRegistry {
/// Whether the chat registry is currently initializing.
pub initializing: bool,
/// Chat rooms
rooms: Vec<Entity<Room>>,
@@ -127,61 +135,29 @@ impl ChatRegistry {
/// Create a new chat registry instance
fn new(window: &mut Window, cx: &mut Context<Self>) -> Self {
let nostr = NostrRegistry::global(cx);
let device = DeviceRegistry::global(cx);
let (tx, rx) = flume::unbounded::<Signal>();
let mut subscriptions = smallvec![];
subscriptions.push(
// Subscribe to the signer event
cx.subscribe_in(&nostr, window, |this, state, event, window, cx| {
cx.subscribe(&nostr, |this, _state, event, cx| {
if event == &StateEvent::SignerSet {
this.reset(cx);
this.get_contact_list(cx);
this.get_metadata(cx);
this.get_rooms(cx);
let signer = state.read(cx).signer();
cx.spawn_in(window, async move |this, cx| {
let user_signer = signer.get().await;
this.update(cx, |this, cx| {
this.get_messages(user_signer, cx);
})
.ok();
})
.detach();
};
}),
);
subscriptions.push(
// Subscribe to the device event
cx.subscribe_in(&device, window, |_this, _s, event, window, cx| {
if event == &DeviceEvent::Set {
let nostr = NostrRegistry::global(cx);
let signer = nostr.read(cx).signer();
cx.spawn_in(window, async move |this, cx| {
if let Some(device_signer) = signer.get_encryption_signer().await {
this.update(cx, |this, cx| {
this.get_messages(device_signer, cx);
})
.ok();
}
})
.detach();
};
}),
);
// Run at the end of the current cycle
cx.defer_in(window, |this, _window, cx| {
this.get_rooms(cx);
this.handle_notifications(cx);
this.tracking(cx);
this.get_rooms(cx);
});
Self {
initializing: true,
rooms: vec![],
trashes: cx.new(|_| BTreeSet::default()),
seens: Arc::new(RwLock::new(HashMap::default())),
@@ -238,6 +214,15 @@ impl ChatRegistry {
continue;
}
// Handle msg relays event to determine when the app is ready to subscribe
if event.kind == Kind::InboxRelays {
let current_user = signer.get_public_key().await?;
if event.pubkey == current_user {
tx.send_async(Signal::inbox_ready(&event)).await?;
}
}
// Skip non-gift wrap events
if event.kind != Kind::GiftWrap {
continue;
@@ -253,17 +238,9 @@ impl ChatRegistry {
event_map.insert(rumor.id.unwrap(), (event.id, dekey));
}
if rumor.kind != Kind::PrivateDirectMessage
|| rumor.kind != Kind::Custom(15)
{
log::info!("Rumor is not releated to NIP17");
continue;
}
// Check if the rumor has a recipient
if rumor.tags.is_empty() {
let signal =
Signal::error(event.as_ref(), "Recipient is missing");
let signal = Signal::error(&event, "Recipient is missing");
tx.send_async(signal).await?;
}
@@ -303,6 +280,16 @@ impl ChatRegistry {
this.new_message(message, cx);
})?;
}
Signal::InboxReady(event) => {
this.update(cx, |this, cx| {
this.get_messages(&event, cx);
})?;
}
Signal::InboxRelayNotFound => {
this.update(cx, |_this, cx| {
cx.emit(ChatEvent::Error("Messaging Relays not found".into()));
})?;
}
Signal::Eose => {
this.update(cx, |this, cx| {
this.get_rooms(cx);
@@ -342,7 +329,7 @@ impl ChatRegistry {
}
/// Get contact list from relays
fn get_contact_list(&mut self, cx: &mut Context<Self>) {
fn get_metadata(&mut self, cx: &mut Context<Self>) {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
@@ -351,96 +338,63 @@ impl ChatRegistry {
return;
};
let task: Task<Result<(), Error>> = cx.background_spawn(async move {
let id = SubscriptionId::new("contact-list");
self.tasks.push(cx.background_spawn(async move {
let opts = SubscribeAutoCloseOptions::default()
.exit_policy(ReqExitPolicy::ExitOnEOSE)
.timeout(Some(Duration::from_secs(TIMEOUT)));
// Construct filter for inbox relays
let filter = Filter::new()
// Construct filter for msg relays
let msg_relays = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
// Construct filter for contact list
let contact_list = Filter::new()
.kind(Kind::ContactList)
.author(public_key)
.limit(1);
// Subscribe
client.subscribe(filter).close_on(opts).with_id(id).await?;
client
.subscribe(vec![msg_relays, contact_list])
.close_on(opts)
.await?;
Ok(())
});
}));
self.tasks.push(task);
}
let client = nostr.read(cx).client();
let tx = self.signal_tx.clone();
/// Get all messages for the provided signer
fn get_messages<T>(&mut self, signer: T, cx: &mut Context<Self>)
where
T: NostrSigner + 'static,
{
let task = self.subscribe_gift_wrap_events(signer, cx);
self.tasks.push(cx.background_spawn(async move {
loop {
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
self.tasks.push(cx.spawn(async move |this, cx| {
match task.await {
Ok(_) => {
this.update(cx, |this, cx| {
this.set_initializing(false, cx);
})?;
}
Err(e) => {
this.update(cx, |_this, cx| {
cx.emit(ChatEvent::Error(SharedString::from(e.to_string())));
})?;
if client.database().query(filter).await?.first().is_some() {
break;
} else {
tx.send_async(Signal::inbox_relay_not_found()).await?;
}
smol::Timer::after(Duration::from_secs(5)).await;
}
Ok(())
}));
}
// Get messaging relay list for current user
fn get_messaging_relays(&self, cx: &App) -> Task<Result<Vec<RelayUrl>, Error>> {
/// Get all messages for the provided signer
fn get_messages(&mut self, msg_relays: &Event, cx: &mut Context<Self>) {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
let urls: Vec<RelayUrl> = nip17::extract_relay_list(msg_relays).cloned().collect();
cx.background_spawn(async move {
let public_key = signer.get_public_key().await?;
let id = SubscriptionId::new("inbox-relay");
// Construct filter for inbox relays
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
// Stream events from user's write relays
let mut stream = client
.stream_events(filter)
.with_id(id)
.timeout(Duration::from_secs(TIMEOUT))
.await?;
while let Some((_url, res)) = stream.next().await {
if let Ok(event) = res {
let urls: Vec<RelayUrl> = nip17::extract_owned_relay_list(event).collect();
return Ok(urls);
}
}
Err(anyhow!("Messaging Relays not found"))
})
}
/// Continuously get gift wrap events for the signer
fn subscribe_gift_wrap_events<T>(&self, signer: T, cx: &App) -> Task<Result<(), Error>>
where
T: NostrSigner + 'static,
{
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let urls = self.get_messaging_relays(cx);
cx.background_spawn(async move {
let urls = urls.await?;
let task: Task<Result<(), Error>> = cx.background_spawn(async move {
let public_key = signer.get_public_key().await?;
let filter = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
let id = SubscriptionId::new(format!("{}-msg", public_key.to_hex()));
@@ -464,38 +418,23 @@ impl ChatRegistry {
);
Ok(())
})
});
self.tasks.push(cx.spawn(async move |this, cx| {
if let Err(e) = task.await {
this.update(cx, |_this, cx| {
cx.emit(ChatEvent::Error(SharedString::from(e.to_string())));
})?;
}
Ok(())
}));
}
/// Refresh the chat registry, fetching messages and contact list from relays.
pub fn refresh(&mut self, window: &mut Window, cx: &mut Context<Self>) {
pub fn refresh(&mut self, cx: &mut Context<Self>) {
self.reset(cx);
self.get_contact_list(cx);
self.get_metadata(cx);
self.get_rooms(cx);
let nostr = NostrRegistry::global(cx);
let signer = nostr.read(cx).signer();
cx.spawn_in(window, async move |this, cx| {
let user_signer = signer.get().await;
let device_signer = signer.get_encryption_signer().await;
this.update(cx, |this, cx| {
this.get_messages(user_signer, cx);
if let Some(device_signer) = device_signer {
this.get_messages(device_signer, cx);
}
})
.ok();
})
.detach();
}
/// Set the initializing status of the chat registry
fn set_initializing(&mut self, initializing: bool, cx: &mut Context<Self>) {
self.initializing = initializing;
cx.notify();
}
/// Get the loading status of the chat registry
@@ -650,7 +589,6 @@ impl ChatRegistry {
/// Reset the registry.
pub fn reset(&mut self, cx: &mut Context<Self>) {
self.initializing = true;
self.rooms.clear();
self.trashes.update(cx, |this, cx| {
this.clear();

View File

@@ -217,8 +217,6 @@ impl RelayAuth {
.send_msg(ClientMessage::Auth(Cow::Borrowed(&event)))
.await?;
log::info!("Sending AUTH event");
while let Some(notification) = notifications.next().await {
match notification {
RelayNotification::Message { message } => {
@@ -272,29 +270,24 @@ impl RelayAuth {
this.update_in(cx, |this, window, cx| {
window.clear_notification_by_id::<AuthNotification>(challenge, cx);
match result {
Ok(_) => {
// Clear pending events for the authenticated relay
this.clear_pending_events(url, cx);
if let Err(e) = result {
window
.push_notification(Notification::error(e.to_string()).autohide(false), cx);
} else {
// Clear pending events for the authenticated relay
this.clear_pending_events(url, cx);
// Only show the success notification if the relay was not already trusted
if !settings.read(cx).trusted_relay(url, cx) {
let domain = url.domain().unwrap_or_default();
let msg = format!("Relay {} has been authenticated", domain);
window.push_notification(Notification::success(msg), cx);
} else {
// Save the authenticated relay to automatically authenticate future requests
settings.update(cx, |this, cx| {
this.add_trusted_relay(url, cx);
});
window.push_notification(
Notification::success(format!(
"Relay {} has been authenticated",
url.domain().unwrap_or_default()
)),
cx,
);
}
Err(e) => {
window.push_notification(
Notification::error(e.to_string()).autohide(false),
cx,
);
}
}
})

View File

@@ -13,13 +13,13 @@ use nostr_sdk::prelude::*;
mod blossom;
mod constants;
mod device;
mod nip05;
mod nip4e;
mod signer;
pub use blossom::*;
pub use constants::*;
pub use device::*;
pub use nip4e::*;
pub use nip05::*;
pub use signer::*;
@@ -149,11 +149,12 @@ impl NostrRegistry {
// Run at the end of current cycle
cx.defer_in(window, |this, _window, cx| {
this.connect(cx);
// Create an identity if none exists
if this.npubs.read(cx).is_empty() {
// Create an identity if none exists
this.create_identity(cx);
} else {
// Show the identity dialog
// Show the account selector dialog
cx.emit(StateEvent::Show);
}
});
@@ -234,10 +235,7 @@ impl NostrRegistry {
}
// Connect to all added relays
client
.connect()
.and_wait(Duration::from_secs(TIMEOUT))
.await;
client.connect().await;
Ok(())
});

View File

@@ -353,7 +353,7 @@ impl Workspace {
let chat = ChatRegistry::global(cx);
// Trigger a refresh of the chat registry
chat.update(cx, |this, cx| {
this.refresh(window, cx);
this.refresh(cx);
});
}
Command::ShowRelayList => {
@@ -639,7 +639,6 @@ impl Workspace {
fn titlebar_right(&mut self, cx: &mut Context<Self>) -> impl IntoElement {
let chat = ChatRegistry::global(cx);
let initializing = chat.read(cx).initializing;
let trash_messages = chat.read(cx).count_trash_messages(cx);
let is_nip4e_enabled = AppSettings::get_encryption_key(cx);
@@ -767,12 +766,6 @@ impl Workspace {
.icon(IconName::Inbox)
.small()
.ghost()
.loading(initializing)
.when(initializing, |this| {
this.label("Inbox")
.xsmall()
.tooltip("Getting inbox messages...")
})
.dropdown_menu(move |this, _window, cx| {
let urls: Vec<(SharedString, SharedString)> = profile
.messaging_relays()