chore: improve message fetching
This commit is contained in:
@@ -12,9 +12,10 @@ use gpui::{
|
||||
};
|
||||
use i18n::{shared_t, t};
|
||||
use identity::Identity;
|
||||
use itertools::Itertools;
|
||||
use nostr_connect::prelude::*;
|
||||
use nostr_sdk::prelude::*;
|
||||
use registry::{Registry, RoomEmitter};
|
||||
use registry::{Registry, RegistrySignal};
|
||||
use serde::Deserialize;
|
||||
use settings::AppSettings;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
@@ -194,7 +195,7 @@ impl ChatSpace {
|
||||
window,
|
||||
|this: &mut Self, _state, event, window, cx| {
|
||||
match event {
|
||||
RoomEmitter::Open(room) => {
|
||||
RegistrySignal::Open(room) => {
|
||||
if let Some(room) = room.upgrade() {
|
||||
this.dock.update(cx, |this, cx| {
|
||||
let panel = chat::init(room, window, cx);
|
||||
@@ -209,7 +210,7 @@ impl ChatSpace {
|
||||
window.push_notification(t!("common.room_error"), cx);
|
||||
}
|
||||
}
|
||||
RoomEmitter::Close(..) => {
|
||||
RegistrySignal::Close(..) => {
|
||||
this.dock.update(cx, |this, cx| {
|
||||
this.focus_tab_panel(window, cx);
|
||||
|
||||
@@ -415,6 +416,28 @@ impl ChatSpace {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn all_panels(window: &mut Window, cx: &mut App) -> Option<Vec<u64>> {
|
||||
let Some(Some(root)) = window.root::<Root>() else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let Ok(chatspace) = root.read(cx).view().clone().downcast::<ChatSpace>() else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let ids = chatspace
|
||||
.read(cx)
|
||||
.dock
|
||||
.read(cx)
|
||||
.items
|
||||
.panel_ids(cx)
|
||||
.into_iter()
|
||||
.filter_map(|panel| panel.parse::<u64>().ok())
|
||||
.collect_vec();
|
||||
|
||||
Some(ids)
|
||||
}
|
||||
}
|
||||
|
||||
impl Render for ChatSpace {
|
||||
|
||||
@@ -4,23 +4,27 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, Error};
|
||||
use assets::Assets;
|
||||
use common::event::EventUtils;
|
||||
use global::constants::{
|
||||
ALL_MESSAGES_SUB_ID, APP_ID, APP_NAME, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT,
|
||||
METADATA_BATCH_TIMEOUT, NEW_MESSAGE_SUB_ID, SEARCH_RELAYS,
|
||||
ALL_MESSAGES_ID, APP_ID, APP_NAME, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT,
|
||||
METADATA_BATCH_TIMEOUT, NEW_MESSAGE_ID, SEARCH_RELAYS,
|
||||
};
|
||||
use global::{nostr_client, NostrSignal};
|
||||
use global::{nostr_client, set_all_gift_wraps_fetched, NostrSignal};
|
||||
use gpui::{
|
||||
actions, point, px, size, App, AppContext, Application, Bounds, KeyBinding, Menu, MenuItem,
|
||||
SharedString, TitlebarOptions, WindowBackgroundAppearance, WindowBounds, WindowDecorations,
|
||||
WindowKind, WindowOptions,
|
||||
};
|
||||
use identity::Identity;
|
||||
use itertools::Itertools;
|
||||
use nostr_sdk::prelude::*;
|
||||
use registry::Registry;
|
||||
use smol::channel::{self, Sender};
|
||||
use theme::Theme;
|
||||
use ui::Root;
|
||||
|
||||
use crate::chatspace::ChatSpace;
|
||||
|
||||
pub(crate) mod chatspace;
|
||||
pub(crate) mod views;
|
||||
|
||||
@@ -130,7 +134,7 @@ fn main() {
|
||||
let duration = smol::Timer::after(Duration::from_secs(30));
|
||||
|
||||
let recv = || async {
|
||||
// prevent inline format
|
||||
// no inline
|
||||
(event_rx.recv().await).ok()
|
||||
};
|
||||
|
||||
@@ -142,11 +146,10 @@ fn main() {
|
||||
match smol::future::or(recv(), timeout()).await {
|
||||
Some(event) => {
|
||||
// Process the gift wrap event unwrapping
|
||||
let is_cached =
|
||||
try_unwrap_event(client, &signal_tx, &mta_tx, &event, false).await;
|
||||
let cached = try_unwrap_event(&signal_tx, &mta_tx, &event, false).await;
|
||||
|
||||
// Increment the total messages counter if message is not from cache
|
||||
if !is_cached {
|
||||
if !cached {
|
||||
counter += 1;
|
||||
}
|
||||
|
||||
@@ -158,7 +161,12 @@ fn main() {
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Notify the UI that the processing is finished
|
||||
signal_tx.send(NostrSignal::Finish).await.ok();
|
||||
// Mark all gift wraps as fetched
|
||||
// For the next time Coop only needs to process new gift wraps
|
||||
set_all_gift_wraps_fetched().await;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -239,7 +247,7 @@ fn main() {
|
||||
|
||||
// Spawn a task to handle events from nostr channel
|
||||
cx.spawn_in(window, async move |_, cx| {
|
||||
let all_messages_sub_id = SubscriptionId::new(ALL_MESSAGES_SUB_ID);
|
||||
let all_messages = SubscriptionId::new(ALL_MESSAGES_ID);
|
||||
|
||||
while let Ok(signal) = signal_rx.recv().await {
|
||||
cx.update(|window, cx| {
|
||||
@@ -252,18 +260,26 @@ fn main() {
|
||||
registry.update(cx, |this, cx| {
|
||||
this.load_rooms(window, cx);
|
||||
this.set_loading(false, cx);
|
||||
// Send a signal to refresh all opened rooms' messages
|
||||
if let Some(ids) = ChatSpace::all_panels(window, cx) {
|
||||
this.refresh_rooms(ids, cx);
|
||||
}
|
||||
});
|
||||
}
|
||||
// Load chat rooms without setting as finished
|
||||
NostrSignal::PartialFinish => {
|
||||
registry.update(cx, |this, cx| {
|
||||
this.load_rooms(window, cx);
|
||||
// Send a signal to refresh all opened rooms' messages
|
||||
if let Some(ids) = ChatSpace::all_panels(window, cx) {
|
||||
this.refresh_rooms(ids, cx);
|
||||
}
|
||||
});
|
||||
}
|
||||
// Load chat rooms without setting as finished
|
||||
NostrSignal::Eose(subscription_id) => {
|
||||
// Only load chat rooms if the subscription ID matches the all_messages_sub_id
|
||||
if subscription_id == all_messages_sub_id {
|
||||
if subscription_id == all_messages {
|
||||
registry.update(cx, |this, cx| {
|
||||
this.load_rooms(window, cx);
|
||||
});
|
||||
@@ -354,7 +370,7 @@ async fn handle_nostr_notifications(
|
||||
mta_tx: &Sender<PublicKey>,
|
||||
event_tx: &Sender<Event>,
|
||||
) -> Result<(), Error> {
|
||||
let new_messages_sub_id = SubscriptionId::new(NEW_MESSAGE_SUB_ID);
|
||||
let new_messages = SubscriptionId::new(NEW_MESSAGE_ID);
|
||||
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
|
||||
|
||||
let mut notifications = client.notifications();
|
||||
@@ -379,9 +395,9 @@ async fn handle_nostr_notifications(
|
||||
|
||||
match event.kind {
|
||||
Kind::GiftWrap => {
|
||||
if *subscription_id == new_messages_sub_id {
|
||||
if *subscription_id == new_messages {
|
||||
let event = event.as_ref();
|
||||
_ = try_unwrap_event(client, signal_tx, mta_tx, event, false).await;
|
||||
_ = try_unwrap_event(signal_tx, mta_tx, event, false).await;
|
||||
} else {
|
||||
event_tx.send(event.into_owned()).await.ok();
|
||||
}
|
||||
@@ -459,52 +475,56 @@ async fn sync_data_for_pubkeys(client: &Client, public_keys: BTreeSet<PublicKey>
|
||||
}
|
||||
|
||||
/// Stores an unwrapped event in local database with reference to original
|
||||
async fn set_unwrapped(client: &Client, root: EventId, event: &Event) -> Result<(), Error> {
|
||||
// Must be use the random generated keys to sign this event
|
||||
let event = EventBuilder::new(Kind::ApplicationSpecificData, event.as_json())
|
||||
.tags(vec![Tag::identifier(root), Tag::event(root)])
|
||||
async fn set_unwrapped(root: EventId, unwrapped: &Event) -> Result<(), Error> {
|
||||
let client = nostr_client();
|
||||
|
||||
// Save unwrapped event
|
||||
client.database().save_event(unwrapped).await?;
|
||||
|
||||
// Create a reference event pointing to the unwrapped event
|
||||
let event = EventBuilder::new(Kind::ApplicationSpecificData, "")
|
||||
.tags(vec![Tag::identifier(root), Tag::event(unwrapped.id)])
|
||||
.sign(&Keys::generate())
|
||||
.await?;
|
||||
|
||||
// Only save this event into the local database
|
||||
// Save reference event
|
||||
client.database().save_event(&event).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieves a previously unwrapped event from local database
|
||||
async fn get_unwrapped(client: &Client, target: EventId) -> Result<Event, Error> {
|
||||
async fn get_unwrapped(root: EventId) -> Result<Event, Error> {
|
||||
let client = nostr_client();
|
||||
let filter = Filter::new()
|
||||
.kind(Kind::ApplicationSpecificData)
|
||||
.identifier(target)
|
||||
.event(target)
|
||||
.identifier(root)
|
||||
.limit(1);
|
||||
|
||||
if let Some(event) = client.database().query(filter).await?.first_owned() {
|
||||
Ok(Event::from_json(event.content)?)
|
||||
let target_id = event.tags.event_ids().collect_vec()[0];
|
||||
|
||||
if let Some(event) = client.database().event_by_id(target_id).await? {
|
||||
Ok(event)
|
||||
} else {
|
||||
Err(anyhow!("Event not found."))
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!("Event is not cached yet"))
|
||||
Err(anyhow!("Event is not cached yet."))
|
||||
}
|
||||
}
|
||||
|
||||
/// Unwraps a gift-wrapped event and processes its contents.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `event` - The gift-wrapped event to unwrap
|
||||
/// * `incoming` - Whether this is a newly received event (true) or old
|
||||
///
|
||||
/// # Returns
|
||||
/// Returns `true` if the event was successfully loaded from cache or saved after unwrapping.
|
||||
async fn try_unwrap_event(
|
||||
client: &Client,
|
||||
signal_tx: &Sender<NostrSignal>,
|
||||
mta_tx: &Sender<PublicKey>,
|
||||
event: &Event,
|
||||
incoming: bool,
|
||||
) -> bool {
|
||||
let client = nostr_client();
|
||||
let mut is_cached = false;
|
||||
|
||||
let event = match get_unwrapped(client, event.id).await {
|
||||
let event = match get_unwrapped(event.id).await {
|
||||
Ok(event) => {
|
||||
is_cached = true;
|
||||
event
|
||||
@@ -512,31 +532,32 @@ async fn try_unwrap_event(
|
||||
Err(_) => {
|
||||
match client.unwrap_gift_wrap(event).await {
|
||||
Ok(unwrap) => {
|
||||
// Sign the unwrapped event with a RANDOM KEYS
|
||||
let Ok(unwrapped) = unwrap.rumor.sign_with_keys(&Keys::generate()) else {
|
||||
log::error!("Failed to sign event");
|
||||
return false;
|
||||
};
|
||||
|
||||
// Save this event to the database for future use.
|
||||
if let Err(e) = set_unwrapped(client, event.id, &unwrapped).await {
|
||||
log::error!("Failed to save event: {e}")
|
||||
if let Err(e) = set_unwrapped(event.id, &unwrapped).await {
|
||||
log::warn!("Failed to cache unwrapped event: {e}")
|
||||
}
|
||||
|
||||
unwrapped
|
||||
}
|
||||
Err(_) => return false,
|
||||
Err(e) => {
|
||||
log::error!("Failed to unwrap event: {e}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Save the event to the database, use for query directly.
|
||||
if let Err(e) = client.database().save_event(&event).await {
|
||||
log::error!("Failed to save event: {e}")
|
||||
}
|
||||
// Get all pubkeys from the event
|
||||
let all_pubkeys = event.all_pubkeys();
|
||||
|
||||
// Send all pubkeys to the batch to sync metadata
|
||||
mta_tx.send(event.pubkey).await.ok();
|
||||
|
||||
for public_key in event.tags.public_keys().copied() {
|
||||
// Send all pubkeys to the metadata batch to sync data
|
||||
for public_key in all_pubkeys {
|
||||
mta_tx.send(public_key).await.ok();
|
||||
}
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ use identity::Identity;
|
||||
use itertools::Itertools;
|
||||
use nostr_sdk::prelude::*;
|
||||
use registry::message::Message;
|
||||
use registry::room::{Room, RoomKind, SendError};
|
||||
use registry::room::{Room, RoomKind, RoomSignal, SendError};
|
||||
use registry::Registry;
|
||||
use serde::Deserialize;
|
||||
use settings::AppSettings;
|
||||
@@ -111,21 +111,28 @@ impl Chat {
|
||||
subscriptions.push(cx.subscribe_in(
|
||||
&room,
|
||||
window,
|
||||
move |this, _, incoming, _window, cx| {
|
||||
// Check if the incoming message is the same as the new message created by optimistic update
|
||||
if this.prevent_duplicate_message(&incoming.0, cx) {
|
||||
return;
|
||||
move |this, _, signal, window, cx| {
|
||||
match signal {
|
||||
RoomSignal::NewMessage(event) => {
|
||||
// Check if the incoming message is the same as the new message created by optimistic update
|
||||
if this.prevent_duplicate_message(event, cx) {
|
||||
return;
|
||||
}
|
||||
|
||||
let old_len = this.messages.read(cx).len();
|
||||
let message = event.clone().into_rc();
|
||||
|
||||
cx.update_entity(&this.messages, |this, cx| {
|
||||
this.extend(vec![message]);
|
||||
cx.notify();
|
||||
});
|
||||
|
||||
this.list_state.splice(old_len..old_len, 1);
|
||||
}
|
||||
RoomSignal::Refresh => {
|
||||
this.load_messages(window, cx);
|
||||
}
|
||||
}
|
||||
|
||||
let old_len = this.messages.read(cx).len();
|
||||
let message = incoming.0.clone().into_rc();
|
||||
|
||||
cx.update_entity(&this.messages, |this, cx| {
|
||||
this.extend(vec![message]);
|
||||
cx.notify();
|
||||
});
|
||||
|
||||
this.list_state.splice(old_len..old_len, 1);
|
||||
},
|
||||
));
|
||||
|
||||
@@ -142,10 +149,10 @@ impl Chat {
|
||||
});
|
||||
|
||||
Self {
|
||||
id: room.read(cx).id.to_string().into(),
|
||||
image_cache: RetainAllImageCache::new(cx),
|
||||
focus_handle: cx.focus_handle(),
|
||||
uploading: false,
|
||||
id: room.read(cx).id.to_string().into(),
|
||||
text_data: HashMap::new(),
|
||||
room,
|
||||
messages,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, Error};
|
||||
use global::constants::{ALL_MESSAGES_SUB_ID, NEW_MESSAGE_SUB_ID, NIP17_RELAYS};
|
||||
use global::constants::{NEW_MESSAGE_ID, NIP17_RELAYS};
|
||||
use global::nostr_client;
|
||||
use gpui::prelude::FluentBuilder;
|
||||
use gpui::{
|
||||
@@ -10,6 +10,7 @@ use gpui::{
|
||||
TextAlign, UniformList, Window,
|
||||
};
|
||||
use i18n::{shared_t, t};
|
||||
use identity::Identity;
|
||||
use itertools::Itertools;
|
||||
use nostr_sdk::prelude::*;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
@@ -207,28 +208,17 @@ impl MessagingRelays {
|
||||
_ = client.connect_relay(&relay).await;
|
||||
}
|
||||
|
||||
let all_msg_id = SubscriptionId::new(ALL_MESSAGES_SUB_ID);
|
||||
let new_msg_id = SubscriptionId::new(NEW_MESSAGE_SUB_ID);
|
||||
|
||||
let all_messages = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
|
||||
let id = SubscriptionId::new(NEW_MESSAGE_ID);
|
||||
let new_messages = Filter::new()
|
||||
.kind(Kind::GiftWrap)
|
||||
.pubkey(public_key)
|
||||
.limit(0);
|
||||
|
||||
// Close old subscriptions
|
||||
client.unsubscribe(&all_msg_id).await;
|
||||
client.unsubscribe(&new_msg_id).await;
|
||||
|
||||
// Subscribe to all messages
|
||||
client
|
||||
.subscribe_with_id(all_msg_id, all_messages, None)
|
||||
.await?;
|
||||
client.unsubscribe(&id).await;
|
||||
|
||||
// Subscribe to new messages
|
||||
client
|
||||
.subscribe_with_id(new_msg_id, new_messages, None)
|
||||
.await?;
|
||||
client.subscribe_with_id(id, new_messages, None).await?;
|
||||
|
||||
Ok(())
|
||||
});
|
||||
@@ -237,6 +227,10 @@ impl MessagingRelays {
|
||||
match task.await {
|
||||
Ok(_) => {
|
||||
cx.update(|window, cx| {
|
||||
Identity::global(cx).update(cx, |this, cx| {
|
||||
this.verify_dm_relays(window, cx);
|
||||
});
|
||||
// Close the current modal
|
||||
window.close_modal(cx);
|
||||
})
|
||||
.ok();
|
||||
|
||||
@@ -20,7 +20,7 @@ use itertools::Itertools;
|
||||
use list_item::RoomListItem;
|
||||
use nostr_sdk::prelude::*;
|
||||
use registry::room::{Room, RoomKind};
|
||||
use registry::{Registry, RoomEmitter};
|
||||
use registry::{Registry, RegistrySignal};
|
||||
use settings::AppSettings;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use theme::ActiveTheme;
|
||||
@@ -83,7 +83,7 @@ impl Sidebar {
|
||||
&chats,
|
||||
window,
|
||||
move |this, _chats, event, _window, cx| {
|
||||
if let RoomEmitter::Request(kind) = event {
|
||||
if let RegistrySignal::NewRequest(kind) = event {
|
||||
this.indicator.update(cx, |this, cx| {
|
||||
*this = Some(kind.to_owned());
|
||||
cx.notify();
|
||||
|
||||
Reference in New Issue
Block a user