feat: wait for processing to complete (#66)

* wait instead of check eose

* refactor

* refactor

* refactor

* improve extend rooms function

* .
This commit is contained in:
reya
2025-06-23 09:00:56 +07:00
committed by GitHub
parent 1d77fd443e
commit c7e3331eb0
18 changed files with 650 additions and 484 deletions

View File

@@ -1,5 +1,5 @@
use std::cmp::Reverse;
use std::collections::BTreeSet;
use std::{cmp::Reverse, collections::HashMap};
use anyhow::Error;
use common::room_hash;
@@ -46,13 +46,15 @@ pub enum RoomEmitter {
pub struct ChatRegistry {
/// Collection of all chat rooms
pub rooms: Vec<Entity<Room>>,
/// Indicates if rooms are currently being loaded
///
/// Always equal to `true` when the app starts
pub wait_for_eose: bool,
pub loading: bool,
/// Subscriptions for observing changes
#[allow(dead_code)]
subscriptions: SmallVec<[Subscription; 2]>,
subscriptions: SmallVec<[Subscription; 1]>,
}
impl EventEmitter<RoomEmitter> for ChatRegistry {}
@@ -77,13 +79,6 @@ impl ChatRegistry {
fn new(cx: &mut Context<Self>) -> Self {
let mut subscriptions = smallvec![];
// When the ChatRegistry is created, load all rooms from the local database
subscriptions.push(cx.observe_new::<Self>(|this, window, cx| {
if let Some(window) = window {
this.load_rooms(window, cx);
}
}));
// When any Room is created, load metadata for all members
subscriptions.push(cx.observe_new::<Room>(|this, _window, cx| {
this.load_metadata(cx).detach();
@@ -91,7 +86,7 @@ impl ChatRegistry {
Self {
rooms: vec![],
wait_for_eose: true,
loading: true,
subscriptions,
}
}
@@ -104,11 +99,6 @@ impl ChatRegistry {
.cloned()
}
/// Get room by its position.
pub fn room_by_ix(&self, ix: usize, _cx: &App) -> Option<&Entity<Room>> {
self.rooms.get(ix)
}
/// Get all ongoing rooms.
pub fn ongoing_rooms(&self, cx: &App) -> Vec<Entity<Room>> {
self.rooms
@@ -162,12 +152,13 @@ impl ChatRegistry {
/// 3. Determines each room's type based on message frequency and trust status
/// 4. Creates Room entities for each unique room
pub fn load_rooms(&mut self, window: &mut Window, cx: &mut Context<Self>) {
let client = &shared_state().client;
let Some(public_key) = Identity::get_global(cx).profile().map(|i| i.public_key()) else {
return;
};
log::info!("Starting to load rooms from database...");
let task: Task<Result<BTreeSet<Room>, Error>> = cx.background_spawn(async move {
let client = shared_state().client();
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
// Get messages sent by the user
let send = Filter::new()
.kind(Kind::PrivateDirectMessage)
@@ -206,7 +197,7 @@ impl ChatRegistry {
// Check if room's author is seen in any contact list
let filter = Filter::new().kind(Kind::ContactList).pubkey(event.pubkey);
// If room's author is seen at least once, mark as trusted
is_trust = client.database().count(filter).await? >= 1;
is_trust = client.database().count(filter).await.unwrap_or(0) >= 1;
if is_trust {
trusted_keys.insert(event.pubkey);
@@ -218,8 +209,9 @@ impl ChatRegistry {
.kind(Kind::PrivateDirectMessage)
.author(public_key)
.pubkeys(public_keys);
// If current user has sent a message at least once, mark as ongoing
let is_ongoing = client.database().count(filter).await? >= 1;
let is_ongoing = client.database().count(filter).await.unwrap_or(1) >= 1;
if is_ongoing {
rooms.insert(Room::new(&event).kind(RoomKind::Ongoing));
@@ -234,33 +226,45 @@ impl ChatRegistry {
});
cx.spawn_in(window, async move |this, cx| {
let rooms = task
.await
.expect("Failed to load chat rooms. Please restart the application.");
this.update(cx, |this, cx| {
this.wait_for_eose = false;
this.rooms.extend(
rooms
.into_iter()
.sorted_by_key(|room| Reverse(room.created_at))
.filter_map(|room| {
if !this.rooms.iter().any(|this| this.read(cx).id == room.id) {
Some(cx.new(|_| room))
} else {
None
}
})
.collect_vec(),
);
cx.notify();
})
.ok();
match task.await {
Ok(rooms) => {
this.update(cx, |this, cx| {
this.extend_rooms(rooms, cx);
this.sort(cx);
})
.ok();
}
Err(e) => {
// TODO: push notification
log::error!("Failed to load rooms: {e}")
}
};
})
.detach();
}
pub(crate) fn extend_rooms(&mut self, rooms: BTreeSet<Room>, cx: &mut Context<Self>) {
let mut room_map: HashMap<u64, usize> = HashMap::with_capacity(self.rooms.len());
for (index, room) in self.rooms.iter().enumerate() {
room_map.insert(room.read(cx).id, index);
}
for new_room in rooms.into_iter() {
// Check if we already have a room with this ID
if let Some(&index) = room_map.get(&new_room.id) {
self.rooms[index].update(cx, |this, cx| {
*this = new_room;
cx.notify();
});
} else {
let new_index = self.rooms.len();
room_map.insert(new_room.id, new_index);
self.rooms.push(cx.new(|_| new_room));
}
}
}
/// Push a new Room to the global registry
pub fn push_room(&mut self, room: Entity<Room>, cx: &mut Context<Self>) {
let weak_room = if let Some(room) = self
@@ -324,4 +328,9 @@ impl ChatRegistry {
cx.notify();
}
}
pub fn set_loading(&mut self, status: bool, cx: &mut Context<Self>) {
self.loading = status;
cx.notify();
}
}

View File

@@ -324,30 +324,18 @@ impl Room {
///
/// # Returns
///
/// A Task that resolves to Result<Vec<(PublicKey, Option<Metadata>)>, Error>
#[allow(clippy::type_complexity)]
/// A Task that resolves to Result<(), Error>
pub fn load_metadata(&self, cx: &mut Context<Self>) -> Task<Result<(), Error>> {
let public_keys = Arc::clone(&self.members);
cx.background_spawn(async move {
for public_key in public_keys.iter() {
let metadata = shared_state()
.client
.database()
.metadata(*public_key)
.await?;
let database = shared_state().client().database();
shared_state()
.persons
.write()
.await
.entry(*public_key)
.and_modify(|entry| {
if entry.is_none() {
*entry = metadata.clone();
}
})
.or_insert_with(|| metadata);
for public_key in public_keys.iter().cloned() {
if !shared_state().has_person(&public_key).await {
let metadata = database.metadata(public_key).await?;
shared_state().insert_person(public_key, metadata).await;
}
}
Ok(())
@@ -368,6 +356,7 @@ impl Room {
let pubkeys = Arc::clone(&self.members);
cx.background_spawn(async move {
let database = shared_state().client().database();
let mut result = Vec::with_capacity(pubkeys.len());
for pubkey in pubkeys.iter() {
@@ -375,13 +364,7 @@ impl Room {
.kind(Kind::InboxRelays)
.author(*pubkey)
.limit(1);
let is_ready = shared_state()
.client
.database()
.query(filter)
.await?
.first()
.is_some();
let is_ready = database.query(filter).await?.first().is_some();
result.push((*pubkey, is_ready));
}
@@ -410,11 +393,10 @@ impl Room {
cx.background_spawn(async move {
let mut messages = vec![];
let parser = NostrParser::new();
let database = shared_state().client().database();
// Get all events from database
let events = shared_state()
.client
.database()
let events = database
.query(filter)
.await?
.into_iter()
@@ -637,7 +619,8 @@ impl Room {
let backup = AppSettings::get_global(cx).settings.backup_messages;
cx.background_spawn(async move {
let signer = shared_state().client.signer().await?;
let client = shared_state().client();
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let mut reports = vec![];
@@ -680,13 +663,11 @@ impl Room {
};
for receiver in receivers.iter() {
if let Err(e) = shared_state()
.client
if let Err(e) = client
.send_private_msg(*receiver, &content, tags.clone())
.await
{
let metadata = shared_state()
.client
let metadata = client
.database()
.metadata(*receiver)
.await?
@@ -703,13 +684,11 @@ impl Room {
// Only send a backup message to current user if there are no issues when sending to others
if backup && reports.is_empty() {
if let Err(e) = shared_state()
.client
if let Err(e) = client
.send_private_msg(*current_user, &content, tags.clone())
.await
{
let metadata = shared_state()
.client
let metadata = client
.database()
.metadata(*current_user)
.await?