wip: refactor

This commit is contained in:
2024-12-17 18:51:06 +07:00
parent 377f169420
commit 0682612d42
27 changed files with 486 additions and 259 deletions

View File

@@ -10,13 +10,14 @@ use std::{
sync::{Arc, OnceLock},
time::Duration,
};
use tokio::{
sync::{broadcast, mpsc},
time::sleep,
};
use tokio::{sync::mpsc, time::sleep};
use constants::{ALL_MESSAGES_SUB_ID, APP_NAME, FAKE_SIG, METADATA_DELAY, NEW_MESSAGE_SUB_ID};
use states::{account::AccountRegistry, chat::ChatRegistry, signal::SignalRegistry};
use states::{
account::AccountRegistry,
chat::ChatRegistry,
metadata::{MetadataRegistry, Signal},
};
use views::app::AppView;
pub mod asset;
@@ -74,23 +75,24 @@ async fn main() {
// Connect to all relays
_ = client.connect().await;
// Channel for metadata signal
let (signal_tx, mut signal_rx) = mpsc::channel::<PublicKey>(1000); // TODO: adjust?
// Channel for EOSE
// When receive EOSE from relay(s) -> Load all rooms and push it into UI.
let (eose_tx, mut eose_rx) = mpsc::channel::<SubscriptionId>(200);
// Channel for new chat
let (new_chat_tx, mut new_chat_rx) = mpsc::channel::<Event>(1000); // TODO: adjust?
// Channel for new message
// Push new message to chat panel or create new chat room if not exist.
let (message_tx, message_rx) = flume::unbounded::<Event>();
let message_rx_clone = message_rx.clone();
// Channel for all chats
// When receive EOSE from relay(s). Reload UI
let (all_chats_tx, mut all_chats_rx) = mpsc::channel::<i32>(1);
// Channel for metadata request queue
let (queue_tx, mut queue_rx) = broadcast::channel::<PublicKey>(100);
// Channel for signal
// Merge all metadata requests into single one.
// Notify to reload element if receive new metadata.
let (signal_tx, mut signal_rx) = mpsc::channel::<Signal>(5000);
let signal_tx_clone = signal_tx.clone();
tokio::spawn(async move {
let sig = Signature::from_str(FAKE_SIG).unwrap();
let all_messages_sub_id = SubscriptionId::new(ALL_MESSAGES_SUB_ID);
let new_message_sub_id = SubscriptionId::new(NEW_MESSAGE_SUB_ID);
let new_message = SubscriptionId::new(NEW_MESSAGE_SUB_ID);
while let Ok(notification) = notifications.recv().await {
#[allow(clippy::collapsible_match)]
@@ -121,69 +123,42 @@ async fn main() {
);
// Save rumor to database to further query
_ = client.database().save_event(&ev).await;
if let Err(e) = client.database().save_event(&ev).await {
println!("Save error: {}", e);
}
// Send event back to channel
if subscription_id == new_message_sub_id {
if let Err(e) = new_chat_tx.send(ev).await {
if subscription_id == new_message {
if let Err(e) = message_tx.send_async(ev).await {
println!("Error: {}", e)
}
}
}
}
} else if event.kind == Kind::Metadata {
_ = signal_tx.send(event.pubkey).await;
if let Err(e) = signal_tx.send(Signal::DONE(event.pubkey)).await {
println!("Error: {}", e)
}
}
} else if let RelayMessage::EndOfStoredEvents(subscription_id) = message {
if all_messages_sub_id == subscription_id {
_ = all_chats_tx.send(1).await;
if let Err(e) = eose_tx.send(subscription_id).await {
println!("Error: {}", e)
}
}
}
}
});
tokio::spawn(async move {
let mut queue: HashSet<PublicKey> = HashSet::new();
while let Ok(public_key) = queue_rx.recv().await {
queue.insert(public_key);
// Wait for METADATA_DELAY
sleep(Duration::from_millis(METADATA_DELAY)).await;
if !queue.is_empty() {
let authors: Vec<PublicKey> = queue.iter().copied().collect();
let total = authors.len();
let filter = Filter::new()
.authors(authors)
.kind(Kind::Metadata)
.limit(total);
let opts = SubscribeAutoCloseOptions::default()
.filter(FilterOptions::WaitDurationAfterEOSE(Duration::from_secs(2)));
// Clear queue
queue.clear();
if let Err(e) = client.subscribe(vec![filter], Some(opts)).await {
println!("Error: {}", e);
}
}
}
});
App::new()
.with_assets(Assets)
.with_http_client(Arc::new(reqwest_client::ReqwestClient::new()))
.run(move |cx| {
// Account state
AccountRegistry::set_global(cx);
// Metadata state
MetadataRegistry::set_global(cx, signal_tx_clone);
// Chat state
ChatRegistry::set_global(cx);
// Hold all metadata requests and merged it
SignalRegistry::set_global(cx, Arc::new(queue_tx));
ChatRegistry::set_global(cx, message_rx);
// Initialize components
coop_ui::init(cx);
@@ -192,16 +167,55 @@ async fn main() {
cx.on_action(quit);
cx.spawn(|async_cx| async move {
while let Some(public_key) = signal_rx.recv().await {
_ = async_cx.update_global::<SignalRegistry, _>(|state, _cx| {
state.push(public_key);
});
let mut queue: HashSet<PublicKey> = HashSet::new();
while let Some(signal) = signal_rx.recv().await {
match signal {
Signal::REQ(public_key) => {
queue.insert(public_key);
// Wait for METADATA_DELAY
sleep(Duration::from_millis(METADATA_DELAY)).await;
if !queue.is_empty() {
let authors: Vec<PublicKey> = queue.iter().copied().collect();
let total = authors.len();
let filter = Filter::new()
.authors(authors)
.kind(Kind::Metadata)
.limit(total);
let opts = SubscribeAutoCloseOptions::default().filter(
FilterOptions::WaitDurationAfterEOSE(Duration::from_secs(2)),
);
queue.clear();
async_cx
.background_executor()
.spawn(async move {
if let Err(e) =
client.subscribe(vec![filter], Some(opts)).await
{
println!("Error: {}", e);
}
})
.await;
}
}
Signal::DONE(public_key) => {
_ = async_cx.update_global::<MetadataRegistry, _>(|state, _| {
state.seen(public_key);
});
}
}
}
})
.detach();
cx.spawn(|async_cx| async move {
while let Some(event) = new_chat_rx.recv().await {
while let Ok(event) = message_rx_clone.recv_async().await {
_ = async_cx.update_global::<ChatRegistry, _>(|state, cx| {
state.push(event, cx);
});
@@ -210,10 +224,14 @@ async fn main() {
.detach();
cx.spawn(|async_cx| async move {
while let Some(_n) = all_chats_rx.recv().await {
_ = async_cx.update_global::<ChatRegistry, _>(|state, cx| {
state.load(cx);
});
let all_messages = SubscriptionId::new(ALL_MESSAGES_SUB_ID);
while let Some(subscription_id) = eose_rx.recv().await {
if subscription_id == all_messages {
_ = async_cx.update_global::<ChatRegistry, _>(|state, cx| {
state.load(cx);
});
}
}
})
.detach();