This commit is contained in:
2026-03-09 13:58:49 +07:00
parent aec32e450a
commit 39c04cabad
16 changed files with 475 additions and 554 deletions

View File

@@ -10,11 +10,12 @@ use common::EventUtils;
use fuzzy_matcher::FuzzyMatcher;
use fuzzy_matcher::skim::SkimMatcherV2;
use gpui::{
App, AppContext, Context, Entity, EventEmitter, Global, Subscription, Task, WeakEntity, Window,
App, AppContext, Context, Entity, EventEmitter, Global, SharedString, Subscription, Task,
WeakEntity, Window,
};
use nostr_sdk::prelude::*;
use smallvec::{SmallVec, smallvec};
use state::{DEVICE_GIFTWRAP, NostrRegistry, RelayState, TIMEOUT, USER_GIFTWRAP};
use state::{DEVICE_GIFTWRAP, NostrRegistry, StateEvent, TIMEOUT, USER_GIFTWRAP};
mod message;
mod room;
@@ -39,6 +40,10 @@ pub enum ChatEvent {
CloseRoom(u64),
/// An event to notify UI about a new chat request
Ping,
/// An event to notify UI that the chat registry has subscribed to messaging relays
Subscribed,
/// An error occurred
Error(SharedString),
}
/// Channel signal.
@@ -48,41 +53,25 @@ enum Signal {
Message(NewMessage),
/// Eose received from relay pool
Eose,
}
/// Inbox state.
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
pub enum InboxState {
#[default]
Idle,
Checking,
RelayNotAvailable,
RelayConfigured(Box<Event>),
Subscribing,
}
impl InboxState {
pub fn not_configured(&self) -> bool {
matches!(self, InboxState::RelayNotAvailable)
}
pub fn subscribing(&self) -> bool {
matches!(self, InboxState::Subscribing)
}
/// An error occurred
Error(SharedString),
}
/// Chat Registry
#[derive(Debug)]
pub struct ChatRegistry {
/// Relay state for messaging relay list
state: Entity<InboxState>,
/// Collection of all chat rooms
rooms: Vec<Entity<Room>>,
/// Tracking the status of unwrapping gift wrap events.
tracking_flag: Arc<AtomicBool>,
/// Channel for sending signals to the UI.
signal_tx: flume::Sender<Signal>,
/// Channel for receiving signals from the UI.
signal_rx: flume::Receiver<Signal>,
/// Async tasks
tasks: SmallVec<[Task<Result<(), Error>>; 2]>,
@@ -105,36 +94,18 @@ impl ChatRegistry {
/// Create a new chat registry instance
fn new(window: &mut Window, cx: &mut Context<Self>) -> Self {
let state = cx.new(|_| InboxState::default());
let nostr = NostrRegistry::global(cx);
let (tx, rx) = flume::unbounded::<Signal>();
let mut subscriptions = smallvec![];
subscriptions.push(
// Observe the nip65 state and load chat rooms on every state change
cx.observe(&nostr, |this, state, cx| {
match state.read(cx).relay_list_state {
RelayState::Idle => {
this.reset(cx);
}
RelayState::Configured => {
this.get_contact_list(cx);
this.ensure_messaging_relays(cx);
}
_ => {}
}
// Load rooms on every state change
this.get_rooms(cx);
}),
);
subscriptions.push(
// Observe the nip17 state and load chat rooms on every state change
cx.observe(&state, |this, state, cx| {
if let InboxState::RelayConfigured(event) = state.read(cx) {
let relay_urls: Vec<_> = nip17::extract_relay_list(event).cloned().collect();
this.get_messages(relay_urls, cx);
// Subscribe to the signer event
cx.subscribe(&nostr, |this, _state, event, cx| {
if let StateEvent::SignerSet = event {
this.reset(cx);
this.get_rooms(cx);
this.get_contact_list(cx);
this.get_messages(cx)
}
}),
);
@@ -147,9 +118,10 @@ impl ChatRegistry {
});
Self {
state,
rooms: vec![],
tracking_flag: Arc::new(AtomicBool::new(false)),
signal_rx: rx,
signal_tx: tx,
tasks: smallvec![],
_subscriptions: subscriptions,
}
@@ -167,7 +139,8 @@ impl ChatRegistry {
let sub_id2 = SubscriptionId::new(USER_GIFTWRAP);
// Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<Signal>(1024);
let tx = self.signal_tx.clone();
let rx = self.signal_rx.clone();
self.tasks.push(cx.background_spawn(async move {
let device_signer = signer.get_encryption_signer().await;
@@ -194,19 +167,29 @@ impl ChatRegistry {
// Extract the rumor from the gift wrap event
match extract_rumor(&client, &device_signer, event.as_ref()).await {
Ok(rumor) => match rumor.created_at >= initialized_at {
true => {
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
Ok(rumor) => {
if rumor.tags.is_empty() {
let error: SharedString =
"Message doesn't belong to any rooms".into();
tx.send_async(Signal::Error(error)).await?;
}
tx.send_async(signal).await?;
match rumor.created_at >= initialized_at {
true => {
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
tx.send_async(signal).await?;
}
false => {
status.store(true, Ordering::Release);
}
}
false => {
status.store(true, Ordering::Release);
}
},
}
Err(e) => {
log::warn!("Failed to unwrap the gift wrap event: {e}");
let error: SharedString =
format!("Failed to unwrap the gift wrap event: {e}").into();
tx.send_async(Signal::Error(error)).await?;
}
}
}
@@ -235,6 +218,11 @@ impl ChatRegistry {
this.get_rooms(cx);
})?;
}
Signal::Error(error) => {
this.update(cx, |_this, cx| {
cx.emit(ChatEvent::Error(error));
})?;
}
};
}
@@ -245,6 +233,7 @@ impl ChatRegistry {
/// Tracking the status of unwrapping gift wrap events.
fn tracking(&mut self, cx: &mut Context<Self>) {
let status = self.tracking_flag.clone();
let tx = self.signal_tx.clone();
self.tasks.push(cx.background_spawn(async move {
let loop_duration = Duration::from_secs(15);
@@ -252,6 +241,9 @@ impl ChatRegistry {
loop {
if status.load(Ordering::Acquire) {
_ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed);
_ = tx.send_async(Signal::Eose).await;
} else {
_ = tx.send_async(Signal::Eose).await;
}
smol::Timer::after(loop_duration).await;
}
@@ -289,27 +281,29 @@ impl ChatRegistry {
self.tasks.push(task);
}
/// Ensure messaging relays are set up for the current user.
pub fn ensure_messaging_relays(&mut self, cx: &mut Context<Self>) {
let task = self.verify_relays(cx);
// Set state to checking
self.set_state(InboxState::Checking, cx);
/// Get all messages for current user
fn get_messages(&mut self, cx: &mut Context<Self>) {
let task = self.subscribe(cx);
self.tasks.push(cx.spawn(async move |this, cx| {
let result = task.await?;
// Update state
this.update(cx, |this, cx| {
this.set_state(result, cx);
})?;
match task.await {
Ok(_) => {
this.update(cx, |_this, cx| {
cx.emit(ChatEvent::Subscribed);
})?;
}
Err(e) => {
this.update(cx, |_this, cx| {
cx.emit(ChatEvent::Error(SharedString::from(e.to_string())));
})?;
}
}
Ok(())
}));
}
// Verify messaging relay list for current user
fn verify_relays(&mut self, cx: &mut Context<Self>) -> Task<Result<InboxState, Error>> {
// Get messaging relay list for current user
fn get_messaging_relays(&self, cx: &App) -> Task<Result<Vec<RelayUrl>, Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
@@ -330,50 +324,25 @@ impl ChatRegistry {
.await?;
while let Some((_url, res)) = stream.next().await {
match res {
Ok(event) => {
return Ok(InboxState::RelayConfigured(Box::new(event)));
}
Err(e) => {
log::error!("Failed to receive relay list event: {e}");
}
if let Ok(event) = res {
let urls: Vec<RelayUrl> = nip17::extract_owned_relay_list(event).collect();
return Ok(urls);
}
}
Ok(InboxState::RelayNotAvailable)
Err(anyhow!("Messaging Relays not found"))
})
}
/// Get all messages for current user
fn get_messages<I>(&mut self, relay_urls: I, cx: &mut Context<Self>)
where
I: IntoIterator<Item = RelayUrl>,
{
let task = self.subscribe(relay_urls, cx);
self.tasks.push(cx.spawn(async move |this, cx| {
task.await?;
// Update state
this.update(cx, |this, cx| {
this.set_state(InboxState::Subscribing, cx);
})?;
Ok(())
}));
}
/// Continuously get gift wrap events for the current user in their messaging relays
fn subscribe<I>(&mut self, urls: I, cx: &mut Context<Self>) -> Task<Result<(), Error>>
where
I: IntoIterator<Item = RelayUrl>,
{
fn subscribe(&self, cx: &App) -> Task<Result<(), Error>> {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
let urls = urls.into_iter().collect::<Vec<_>>();
let urls = self.get_messaging_relays(cx);
cx.background_spawn(async move {
let urls = urls.await?;
let public_key = signer.get_public_key().await?;
let filter = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
let id = SubscriptionId::new(USER_GIFTWRAP);
@@ -400,19 +369,6 @@ impl ChatRegistry {
})
}
/// Set the state of the inbox
fn set_state(&mut self, state: InboxState, cx: &mut Context<Self>) {
self.state.update(cx, |this, cx| {
*this = state;
cx.notify();
});
}
/// Get the relay state
pub fn state(&self, cx: &App) -> InboxState {
self.state.read(cx).clone()
}
/// Get the loading status of the chat registry
pub fn loading(&self) -> bool {
self.tracking_flag.load(Ordering::Acquire)