wip: refactor
This commit is contained in:
@@ -10,13 +10,17 @@ use std::{
|
||||
sync::{Arc, OnceLock},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{sync::mpsc, time::sleep};
|
||||
use tokio::{
|
||||
sync::{mpsc, Mutex},
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
use constants::{ALL_MESSAGES_SUB_ID, APP_NAME, FAKE_SIG, METADATA_DELAY, NEW_MESSAGE_SUB_ID};
|
||||
use states::{
|
||||
account::AccountRegistry,
|
||||
chat::ChatRegistry,
|
||||
metadata::{MetadataRegistry, Signal},
|
||||
metadata::MetadataRegistry,
|
||||
signal::{Signal, SignalRegistry},
|
||||
};
|
||||
use views::app::AppView;
|
||||
|
||||
@@ -27,6 +31,7 @@ pub mod utils;
|
||||
pub mod views;
|
||||
|
||||
actions!(main_menu, [Quit]);
|
||||
actions!(app, [ReloadMetadata]);
|
||||
|
||||
static CLIENT: OnceLock<Client> = OnceLock::new();
|
||||
|
||||
@@ -75,21 +80,15 @@ async fn main() {
|
||||
// Connect to all relays
|
||||
_ = client.connect().await;
|
||||
|
||||
// Channel for EOSE
|
||||
// When receive EOSE from relay(s) -> Load all rooms and push it into UI.
|
||||
let (eose_tx, mut eose_rx) = mpsc::channel::<SubscriptionId>(200);
|
||||
// Signal
|
||||
let (signal_tx, mut signal_rx) = mpsc::channel::<Signal>(10000);
|
||||
let (mta_tx, mut mta_rx) = mpsc::unbounded_channel::<PublicKey>();
|
||||
|
||||
// Channel for new message
|
||||
// Push new message to chat panel or create new chat room if not exist.
|
||||
let (message_tx, message_rx) = flume::unbounded::<Event>();
|
||||
let message_rx_clone = message_rx.clone();
|
||||
|
||||
// Channel for signal
|
||||
// Merge all metadata requests into single one.
|
||||
// Notify to reload element if receive new metadata.
|
||||
let (signal_tx, mut signal_rx) = mpsc::channel::<Signal>(5000);
|
||||
let signal_tx_clone = signal_tx.clone();
|
||||
// Re use sender
|
||||
let mta_tx_clone = mta_tx.clone();
|
||||
|
||||
// Handle notification from Relays
|
||||
// Send notfiy back to GPUI
|
||||
tokio::spawn(async move {
|
||||
let sig = Signature::from_str(FAKE_SIG).unwrap();
|
||||
let new_message = SubscriptionId::new(NEW_MESSAGE_SUB_ID);
|
||||
@@ -129,19 +128,19 @@ async fn main() {
|
||||
|
||||
// Send event back to channel
|
||||
if subscription_id == new_message {
|
||||
if let Err(e) = message_tx.send_async(ev).await {
|
||||
if let Err(e) = signal_tx.send(Signal::RecvEvent(ev)).await {
|
||||
println!("Error: {}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if event.kind == Kind::Metadata {
|
||||
if let Err(e) = signal_tx.send(Signal::DONE(event.pubkey)).await {
|
||||
if let Err(e) = signal_tx.send(Signal::RecvMetadata(event.pubkey)).await {
|
||||
println!("Error: {}", e)
|
||||
}
|
||||
}
|
||||
} else if let RelayMessage::EndOfStoredEvents(subscription_id) = message {
|
||||
if let Err(e) = eose_tx.send(subscription_id).await {
|
||||
if let Err(e) = signal_tx.send(Signal::RecvEose(subscription_id)).await {
|
||||
println!("Error: {}", e)
|
||||
}
|
||||
}
|
||||
@@ -149,6 +148,44 @@ async fn main() {
|
||||
}
|
||||
});
|
||||
|
||||
// Handle metadata request
|
||||
// Merge all requests into single subscription
|
||||
tokio::spawn(async move {
|
||||
let queue: Arc<Mutex<HashSet<PublicKey>>> = Arc::new(Mutex::new(HashSet::new()));
|
||||
let queue_clone = queue.clone();
|
||||
|
||||
let (tx, mut rx) = mpsc::channel::<PublicKey>(200);
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(public_key) = mta_rx.recv().await {
|
||||
queue_clone.lock().await.insert(public_key);
|
||||
_ = tx.send(public_key).await;
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
while rx.recv().await.is_some() {
|
||||
sleep(Duration::from_millis(METADATA_DELAY)).await;
|
||||
|
||||
let authors: Vec<PublicKey> = queue.lock().await.drain().collect();
|
||||
let total = authors.len();
|
||||
|
||||
if total > 0 {
|
||||
let filter = Filter::new()
|
||||
.authors(authors)
|
||||
.kind(Kind::Metadata)
|
||||
.limit(total);
|
||||
let opts = SubscribeAutoCloseOptions::default()
|
||||
.filter(FilterOptions::WaitDurationAfterEOSE(Duration::from_secs(2)));
|
||||
|
||||
if let Err(e) = client.subscribe(vec![filter], Some(opts)).await {
|
||||
println!("Error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
App::new()
|
||||
.with_assets(Assets)
|
||||
.with_http_client(Arc::new(reqwest_client::ReqwestClient::new()))
|
||||
@@ -156,9 +193,11 @@ async fn main() {
|
||||
// Account state
|
||||
AccountRegistry::set_global(cx);
|
||||
// Metadata state
|
||||
MetadataRegistry::set_global(cx, signal_tx_clone);
|
||||
MetadataRegistry::set_global(cx);
|
||||
// Chat state
|
||||
ChatRegistry::set_global(cx, message_rx);
|
||||
ChatRegistry::set_global(cx);
|
||||
// Signal state
|
||||
SignalRegistry::set_global(cx, mta_tx_clone);
|
||||
|
||||
// Initialize components
|
||||
coop_ui::init(cx);
|
||||
@@ -166,71 +205,56 @@ async fn main() {
|
||||
// Set quit action
|
||||
cx.on_action(quit);
|
||||
|
||||
/*
|
||||
cx.spawn(|async_cx| async move {
|
||||
let mut queue: HashSet<PublicKey> = HashSet::new();
|
||||
let accounts = get_all_accounts_from_keyring();
|
||||
|
||||
while let Some(signal) = signal_rx.recv().await {
|
||||
match signal {
|
||||
Signal::REQ(public_key) => {
|
||||
queue.insert(public_key);
|
||||
// Automatically Login if only habe 1 account
|
||||
if let Some(account) = accounts.into_iter().next() {
|
||||
if let Ok(keys) = get_keys_by_account(account) {
|
||||
get_client().set_signer(keys).await;
|
||||
|
||||
// Wait for METADATA_DELAY
|
||||
sleep(Duration::from_millis(METADATA_DELAY)).await;
|
||||
|
||||
if !queue.is_empty() {
|
||||
let authors: Vec<PublicKey> = queue.iter().copied().collect();
|
||||
let total = authors.len();
|
||||
|
||||
let filter = Filter::new()
|
||||
.authors(authors)
|
||||
.kind(Kind::Metadata)
|
||||
.limit(total);
|
||||
|
||||
let opts = SubscribeAutoCloseOptions::default().filter(
|
||||
FilterOptions::WaitDurationAfterEOSE(Duration::from_secs(2)),
|
||||
);
|
||||
|
||||
queue.clear();
|
||||
|
||||
async_cx
|
||||
.background_executor()
|
||||
.spawn(async move {
|
||||
if let Err(e) =
|
||||
client.subscribe(vec![filter], Some(opts)).await
|
||||
{
|
||||
println!("Error: {}", e);
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Signal::DONE(public_key) => {
|
||||
_ = async_cx.update_global::<MetadataRegistry, _>(|state, _| {
|
||||
state.seen(public_key);
|
||||
});
|
||||
}
|
||||
_ = async_cx.update_global::<AccountRegistry, _>(|state, _| {
|
||||
state.set_user(Some(account));
|
||||
});
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
cx.spawn(|async_cx| async move {
|
||||
while let Ok(event) = message_rx_clone.recv_async().await {
|
||||
_ = async_cx.update_global::<ChatRegistry, _>(|state, cx| {
|
||||
state.push(event, cx);
|
||||
});
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
*/
|
||||
|
||||
cx.spawn(|async_cx| async move {
|
||||
let all_messages = SubscriptionId::new(ALL_MESSAGES_SUB_ID);
|
||||
let mut is_initialized = false;
|
||||
|
||||
while let Some(subscription_id) = eose_rx.recv().await {
|
||||
if subscription_id == all_messages {
|
||||
_ = async_cx.update_global::<ChatRegistry, _>(|state, cx| {
|
||||
state.load(cx);
|
||||
});
|
||||
while let Some(signal) = signal_rx.recv().await {
|
||||
match signal {
|
||||
Signal::RecvEose(id) => {
|
||||
if id == all_messages {
|
||||
if !is_initialized {
|
||||
_ = async_cx.update_global::<ChatRegistry, _>(|state, _| {
|
||||
state.set_init();
|
||||
});
|
||||
|
||||
is_initialized = true;
|
||||
} else {
|
||||
_ = async_cx.update_global::<ChatRegistry, _>(|state, _| {
|
||||
state.set_reload();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Signal::RecvMetadata(public_key) => {
|
||||
_ = async_cx.update_global::<MetadataRegistry, _>(|state, _cx| {
|
||||
state.seen(public_key);
|
||||
})
|
||||
}
|
||||
Signal::RecvEvent(event) => {
|
||||
_ = async_cx.update_global::<ChatRegistry, _>(|state, _| {
|
||||
state.push(event);
|
||||
});
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user