This commit is contained in:
2026-01-06 10:17:25 +07:00
parent e1ed8483ba
commit 7fb9eb9930
6 changed files with 315 additions and 398 deletions

View File

@@ -10,7 +10,7 @@ use common::{EventUtils, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT};
use flume::Sender;
use fuzzy_matcher::skim::SkimMatcherV2;
use fuzzy_matcher::FuzzyMatcher;
use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, Task};
use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, Task, WeakEntity};
use nostr_sdk::prelude::*;
use settings::AppSettings;
use smallvec::{smallvec, SmallVec};
@@ -30,6 +30,28 @@ struct GlobalChatRegistry(Entity<ChatRegistry>);
impl Global for GlobalChatRegistry {}
/// Chat event.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum ChatEvent {
/// An event to open a room by its ID
OpenRoom(u64),
/// An event to close a room by its ID
CloseRoom(u64),
/// An event to notify UI about a new chat request
Ping,
}
/// Channel signal.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum NostrEvent {
/// Message received from relay pool
Message(NewMessage),
/// Unwrapping status
Unwrapping(bool),
/// Eose received from relay pool
Eose,
}
/// Chat Registry
#[derive(Debug)]
pub struct ChatRegistry {
@@ -40,26 +62,7 @@ pub struct ChatRegistry {
loading: bool,
/// Tasks for asynchronous operations
_tasks: SmallVec<[Task<()>; 4]>,
}
/// Chat event.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum ChatEvent {
OpenRoom(u64),
CloseRoom(u64),
NewRequest(RoomKind),
}
/// Channel signal.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum Signal {
/// Message received from relay pool
Message(NewMessage),
/// Loading status of the registry
Loading(bool),
/// Eose received from relay pool
Eose,
_tasks: SmallVec<[Task<()>; 3]>,
}
impl EventEmitter<ChatEvent> for ChatRegistry {}
@@ -84,7 +87,7 @@ impl ChatRegistry {
let status = Arc::new(AtomicBool::new(true));
// Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<Signal>(2048);
let (tx, rx) = flume::bounded::<NostrEvent>(2048);
let mut tasks = smallvec![];
@@ -102,7 +105,7 @@ impl ChatRegistry {
tasks.push(
// Handle unwrapping progress
cx.background_spawn(
async move { Self::handle_unwrapping(&client, &status, &tx).await },
async move { Self::unwrapping_status(&client, &status, &tx).await },
),
);
@@ -111,19 +114,19 @@ impl ChatRegistry {
cx.spawn(async move |this, cx| {
while let Ok(message) = rx.recv_async().await {
match message {
Signal::Message(message) => {
NostrEvent::Message(message) => {
this.update(cx, |this, cx| {
this.new_message(message, cx);
})
.ok();
}
Signal::Eose => {
NostrEvent::Eose => {
this.update(cx, |this, cx| {
this.get_rooms(cx);
})
.ok();
}
Signal::Loading(status) => {
NostrEvent::Unwrapping(status) => {
this.update(cx, |this, cx| {
this.set_loading(status, cx);
this.get_rooms(cx);
@@ -142,7 +145,11 @@ impl ChatRegistry {
}
}
async fn handle_notifications(client: &Client, loading: &Arc<AtomicBool>, tx: &Sender<Signal>) {
async fn handle_notifications(
client: &Client,
loading: &Arc<AtomicBool>,
tx: &Sender<NostrEvent>,
) {
let initialized_at = Timestamp::now();
let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION);
@@ -193,7 +200,7 @@ impl ChatRegistry {
if !sent_by_coop {
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
let signal = NostrEvent::Message(new_message);
if let Err(e) = tx.send_async(signal).await {
log::error!("Failed to send signal: {}", e);
@@ -212,7 +219,7 @@ impl ChatRegistry {
}
RelayMessage::EndOfStoredEvents(id) => {
if id.as_ref() == &subscription_id {
if let Err(e) = tx.send_async(Signal::Eose).await {
if let Err(e) = tx.send_async(NostrEvent::Eose).await {
log::error!("Failed to send signal: {}", e);
}
}
@@ -222,7 +229,7 @@ impl ChatRegistry {
}
}
async fn handle_unwrapping(client: &Client, status: &Arc<AtomicBool>, tx: &Sender<Signal>) {
async fn unwrapping_status(client: &Client, status: &Arc<AtomicBool>, tx: &Sender<NostrEvent>) {
let loop_duration = Duration::from_secs(20);
let mut is_start_processing = false;
let mut total_loops = 0;
@@ -238,7 +245,7 @@ impl ChatRegistry {
_ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed);
// Send loading signal
if let Err(e) = tx.send_async(Signal::Loading(true)).await {
if let Err(e) = tx.send_async(NostrEvent::Unwrapping(true)).await {
log::error!("Failed to send signal: {}", e);
}
} else {
@@ -246,7 +253,7 @@ impl ChatRegistry {
// Wait until after 2 loops to prevent exiting early while events are still being processed
if is_start_processing && total_loops >= 2 {
// Send loading signal
if let Err(e) = tx.send_async(Signal::Loading(false)).await {
if let Err(e) = tx.send_async(NostrEvent::Unwrapping(false)).await {
log::error!("Failed to send signal: {}", e);
}
// Reset the counter
@@ -270,12 +277,12 @@ impl ChatRegistry {
cx.notify();
}
/// Get a room by its ID.
pub fn room(&self, id: &u64, cx: &App) -> Option<Entity<Room>> {
/// Get a weak reference to a room by its ID.
pub fn room(&self, id: &u64, cx: &App) -> Option<WeakEntity<Room>> {
self.rooms
.iter()
.find(|model| model.read(cx).id == *id)
.cloned()
.find(|this| &this.read(cx).id == id)
.map(|this| this.downgrade())
}
/// Get all ongoing rooms.
@@ -297,11 +304,30 @@ impl ChatRegistry {
}
/// Add a new room to the start of list.
pub fn add_room(&mut self, room: Entity<Room>, cx: &mut Context<Self>) {
self.rooms.insert(0, room);
pub fn add_room<I>(&mut self, room: I, cx: &mut Context<Self>)
where
I: Into<Room>,
{
self.rooms.insert(0, cx.new(|_| room.into()));
cx.notify();
}
/// Emit an open room event.
/// If the room is new, add it to the registry.
pub fn emit_room(&mut self, room: WeakEntity<Room>, cx: &mut Context<Self>) {
if let Some(room) = room.upgrade() {
let id = room.read(cx).id;
// If the room is new, add it to the registry.
if !self.rooms.iter().any(|r| r.read(cx).id == id) {
self.rooms.insert(0, room);
}
// Emit the open room event.
cx.emit(ChatEvent::OpenRoom(id));
}
}
/// Close a room.
pub fn close_room(&mut self, id: u64, cx: &mut Context<Self>) {
if self.rooms.iter().any(|r| r.read(cx).id == id) {
@@ -345,17 +371,6 @@ impl ChatRegistry {
cx.notify();
}
/// Push a new room to the chat registry
pub fn push_room(&mut self, room: Entity<Room>, cx: &mut Context<Self>) {
let id = room.read(cx).id;
if !self.rooms.iter().any(|r| r.read(cx).id == id) {
self.add_room(room, cx);
}
cx.emit(ChatEvent::OpenRoom(id));
}
/// Extend the registry with new rooms.
fn extend_rooms(&mut self, rooms: HashSet<Room>, cx: &mut Context<Self>) {
let mut room_map: HashMap<u64, usize> = self
@@ -506,39 +521,45 @@ impl ChatRegistry {
/// If the room doesn't exist, it will be created.
/// Updates room ordering based on the most recent messages.
pub fn new_message(&mut self, message: NewMessage, cx: &mut Context<Self>) {
let id = message.rumor.uniq_id();
let author = message.rumor.pubkey;
let nostr = NostrRegistry::global(cx);
// Get the unique id
let id = message.rumor.uniq_id();
// Get the author
let author = message.rumor.pubkey;
if let Some(room) = self.rooms.iter().find(|room| room.read(cx).id == id) {
let is_new_event = message.rumor.created_at > room.read(cx).created_at;
let created_at = message.rumor.created_at;
match self.rooms.iter().find(|room| room.read(cx).id == id) {
Some(room) => {
let new_message = message.rumor.created_at > room.read(cx).created_at;
let created_at = message.rumor.created_at;
// Update room
room.update(cx, |this, cx| {
if is_new_event {
this.set_created_at(created_at, cx);
// Update room
room.update(cx, |this, cx| {
// Update the last timestamp if the new message is newer
if new_message {
this.set_created_at(created_at, cx);
}
// Set this room is ongoing if the new message is from current user
if author == nostr.read(cx).identity().read(cx).public_key() {
this.set_ongoing(cx);
}
// Emit the new message to the room
this.emit_message(message, cx);
});
// Resort all rooms in the registry by their created at (after updated)
if new_message {
self.sort(cx);
}
// Set this room is ongoing if the new message is from current user
if author == nostr.read(cx).identity().read(cx).public_key() {
this.set_ongoing(cx);
}
// Emit the new message to the room
this.emit_message(message, cx);
});
// Resort all rooms in the registry by their created at (after updated)
if is_new_event {
self.sort(cx);
}
} else {
// Push the new room to the front of the list
self.add_room(cx.new(|_| Room::from(&message.rumor)), cx);
None => {
// Push the new room to the front of the list
self.add_room(&message.rumor, cx);
// Notify the UI about the new room
cx.emit(ChatEvent::NewRequest(RoomKind::default()));
// Notify the UI about the new room
cx.emit(ChatEvent::Ping);
}
}
}