This commit is contained in:
2026-06-05 13:31:30 +07:00
parent d53e9d538c
commit c791309659
4 changed files with 71 additions and 69 deletions

View File

@@ -16,7 +16,7 @@ use gpui::{
use nostr_sdk::prelude::*;
use smallvec::{SmallVec, smallvec};
use smol::lock::RwLock;
use state::{CoopSigner, DEVICE_GIFTWRAP, NostrRegistry, StateEvent, TIMEOUT, USER_GIFTWRAP};
use state::{CoopSigner, DEVICE_GIFTWRAP, NostrRegistry, StateEvent, USER_GIFTWRAP};
mod message;
mod room;
@@ -41,6 +41,8 @@ pub enum ChatEvent {
CloseRoom(u64),
/// An event to notify UI about a new chat request
Ping,
/// No Inbox Relays found, the app is not ready to subscribe messages
InboxRelayNotFound,
/// An error occurred
Error(SharedString),
}
@@ -85,9 +87,6 @@ impl Signal {
}
}
type Dekey = bool;
type GiftWrapId = EventId;
/// Chat Registry
#[derive(Debug)]
pub struct ChatRegistry {
@@ -101,10 +100,13 @@ pub struct ChatRegistry {
seens: Arc<RwLock<HashMap<EventId, HashSet<RelayUrl>>>>,
/// Mapping of unwrapped event ids to their gift wrap event ids
event_map: Arc<RwLock<HashMap<EventId, (GiftWrapId, Dekey)>>>,
event_map: Arc<RwLock<HashMap<EventId, EventId>>>,
/// Tracking the status of unwrapping gift wrap events.
tracking_flag: Arc<AtomicBool>,
tracking: Arc<AtomicBool>,
/// Whether the messaging relays have been found.
msg_relays_existed: Arc<AtomicBool>,
/// Channel for sending signals to the UI.
signal_tx: flume::Sender<Signal>,
@@ -162,7 +164,8 @@ impl ChatRegistry {
trashes: cx.new(|_| BTreeSet::default()),
seens: Arc::new(RwLock::new(HashMap::default())),
event_map: Arc::new(RwLock::new(HashMap::default())),
tracking_flag: Arc::new(AtomicBool::new(false)),
tracking: Arc::new(AtomicBool::new(false)),
msg_relays_existed: Arc::new(AtomicBool::new(false)),
signal_rx: rx,
signal_tx: tx,
tasks: smallvec![],
@@ -175,7 +178,10 @@ impl ChatRegistry {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
let status = self.tracking_flag.clone();
let tracking = self.tracking.clone();
let msg_relays_existed = self.msg_relays_existed.clone();
let seens = self.seens.clone();
let event_map = self.event_map.clone();
let trashes = self.trashes.downgrade();
@@ -199,10 +205,7 @@ impl ChatRegistry {
};
match *message {
RelayMessage::Event {
event,
subscription_id,
} => {
RelayMessage::Event { event, .. } => {
// Keep track of which relays have seen this event
{
let mut seens = seens.write().await;
@@ -219,6 +222,9 @@ impl ChatRegistry {
let current_user = signer.get_public_key().await?;
if event.pubkey == current_user {
// Mark that the msg relays have been found
msg_relays_existed.store(true, Ordering::Release);
// Emit the inbox ready signal
tx.send_async(Signal::inbox_ready(&event)).await?;
}
}
@@ -234,8 +240,7 @@ impl ChatRegistry {
// Map the rumor id to the gift wrap event id for later lookup
{
let mut event_map = event_map.write().await;
let dekey = subscription_id.as_ref() == &sub_id1;
event_map.insert(rumor.id.unwrap(), (event.id, dekey));
event_map.insert(rumor.id.unwrap(), event.id);
}
// Check if the rumor has a recipient
@@ -250,7 +255,7 @@ impl ChatRegistry {
tx.send_async(signal).await?;
} else {
// Mark the chat still processing new messages
status.store(true, Ordering::Release);
tracking.store(true, Ordering::Release);
}
}
Err(e) => {
@@ -287,7 +292,7 @@ impl ChatRegistry {
}
Signal::InboxRelayNotFound => {
this.update(cx, |_this, cx| {
cx.emit(ChatEvent::Error("Messaging Relays not found".into()));
cx.emit(ChatEvent::InboxRelayNotFound);
})?;
}
Signal::Eose => {
@@ -310,7 +315,7 @@ impl ChatRegistry {
/// Tracking the status of unwrapping gift wrap events.
fn tracking(&mut self, cx: &mut Context<Self>) {
let status = self.tracking_flag.clone();
let status = self.tracking.clone();
let tx = self.signal_tx.clone();
self.tasks.push(cx.background_spawn(async move {
@@ -328,8 +333,8 @@ impl ChatRegistry {
}));
}
/// Get contact list from relays
fn get_metadata(&mut self, cx: &mut Context<Self>) {
/// Get all necessary metadata from relays for current user
pub fn get_metadata(&mut self, cx: &mut Context<Self>) {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
@@ -339,9 +344,7 @@ impl ChatRegistry {
};
self.tasks.push(cx.background_spawn(async move {
let opts = SubscribeAutoCloseOptions::default()
.exit_policy(ReqExitPolicy::ExitOnEOSE)
.timeout(Some(Duration::from_secs(TIMEOUT)));
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
// Construct filter for msg relays
let msg_relays = Filter::new()
@@ -358,29 +361,27 @@ impl ChatRegistry {
// Subscribe
client
.subscribe(vec![msg_relays, contact_list])
.with_id(SubscriptionId::new("user-meta"))
.close_on(opts)
.await?;
Ok(())
}));
let client = nostr.read(cx).client();
let tx = self.signal_tx.clone();
let msg_relays_existed = self.msg_relays_existed.clone();
// Reset the status flag
msg_relays_existed.store(false, Ordering::Release);
// Wait for the msg relays to be found or timeout
self.tasks.push(cx.background_spawn(async move {
loop {
let filter = Filter::new()
.kind(Kind::InboxRelays)
.author(public_key)
.limit(1);
// Wait for 5 seconds
smol::Timer::after(Duration::from_secs(5)).await;
if client.database().query(filter).await?.first().is_some() {
break;
} else {
tx.send_async(Signal::inbox_relay_not_found()).await?;
}
smol::Timer::after(Duration::from_secs(5)).await;
// Then check if the msg relays have been found
if !msg_relays_existed.load(Ordering::Acquire) {
tx.send_async(Signal::inbox_relay_not_found()).await?;
}
Ok(())
@@ -439,7 +440,7 @@ impl ChatRegistry {
/// Get the loading status of the chat registry
pub fn loading(&self) -> bool {
self.tracking_flag.load(Ordering::Acquire)
self.tracking.load(Ordering::Acquire)
}
/// Get a weak reference to a room by its ID.
@@ -491,7 +492,7 @@ impl ChatRegistry {
self.event_map
.read_blocking()
.get(id)
.map(|(id, _dekey)| self.seen_on(id))
.map(|id| self.seen_on(id))
}
/// Get the relays that have seen a given gift wrap id.
@@ -503,15 +504,6 @@ impl ChatRegistry {
.unwrap_or_default()
}
/// Check if a given rumor was encrypted by the dekey.
pub fn encrypted_by_dekey(&self, id: &EventId) -> bool {
self.event_map
.read_blocking()
.get(id)
.map(|(_, dekey)| *dekey)
.unwrap_or(false)
}
/// Add a new room to the start of list.
pub fn add_room<I>(&mut self, room: I, cx: &mut Context<Self>)
where