wip: refactor

This commit is contained in:
2024-12-15 07:53:53 +07:00
parent f82eaa4ac3
commit 2b9c88c5b7
5 changed files with 88 additions and 12 deletions

View File

@@ -4,14 +4,18 @@ use dirs::config_dir;
use gpui::*;
use nostr_sdk::prelude::*;
use std::{
collections::HashSet,
fs,
str::FromStr,
sync::{Arc, OnceLock},
time::Duration,
};
use tokio::sync::mpsc;
use tokio::{
sync::{broadcast, mpsc},
time::sleep,
};
use constants::{ALL_MESSAGES_SUB_ID, APP_NAME, FAKE_SIG, NEW_MESSAGE_SUB_ID};
use constants::{ALL_MESSAGES_SUB_ID, APP_NAME, FAKE_SIG, METADATA_DELAY, NEW_MESSAGE_SUB_ID};
use states::{account::AccountRegistry, chat::ChatRegistry, signal::SignalRegistry};
use views::app::AppView;
@@ -77,8 +81,12 @@ async fn main() {
let (new_chat_tx, mut new_chat_rx) = mpsc::channel::<Event>(1000); // TODO: adjust?
// Channel for all chats
// When receive EOSE from relay(s). Reload UI
let (all_chats_tx, mut all_chats_rx) = mpsc::channel::<i32>(1);
// Channel for metadata request queue
let (queue_tx, mut queue_rx) = broadcast::channel::<PublicKey>(100);
tokio::spawn(async move {
let sig = Signature::from_str(FAKE_SIG).unwrap();
let all_messages_sub_id = SubscriptionId::new(ALL_MESSAGES_SUB_ID);
@@ -115,9 +123,11 @@ async fn main() {
// Save rumor to database to further query
_ = client.database().save_event(&ev).await;
// Send event to channel
// Send event back to channel
if subscription_id == new_message_sub_id {
_ = new_chat_tx.send(ev).await;
if let Err(e) = new_chat_tx.send(ev).await {
println!("Error: {}", e)
}
}
}
}
@@ -133,13 +143,47 @@ async fn main() {
}
});
tokio::spawn(async move {
let mut queue: HashSet<PublicKey> = HashSet::new();
while let Ok(public_key) = queue_rx.recv().await {
queue.insert(public_key);
// Wait for METADATA_DELAY
sleep(Duration::from_millis(METADATA_DELAY)).await;
if !queue.is_empty() {
let authors: Vec<PublicKey> = queue.iter().copied().collect();
let total = authors.len();
let filter = Filter::new()
.authors(authors)
.kind(Kind::Metadata)
.limit(total);
let opts = SubscribeAutoCloseOptions::default()
.filter(FilterOptions::WaitDurationAfterEOSE(Duration::from_secs(2)));
// Clear queue
queue.clear();
if let Err(e) = client.subscribe(vec![filter], Some(opts)).await {
println!("Error: {}", e);
}
}
}
});
App::new()
.with_assets(Assets)
.with_http_client(Arc::new(reqwest_client::ReqwestClient::new()))
.run(move |cx| {
// Account state
AccountRegistry::set_global(cx);
// Chat state
ChatRegistry::set_global(cx);
SignalRegistry::set_global(cx);
// Hold all metadata requests and merged it
SignalRegistry::set_global(cx, Arc::new(queue_tx));
// Initialize components
coop_ui::init(cx);