chore: fix missing message during initial fetch (#110)
* remove fetched flag * . * improve
This commit is contained in:
@@ -6,10 +6,10 @@ use anyhow::{anyhow, Error};
|
||||
use assets::Assets;
|
||||
use common::event::EventUtils;
|
||||
use global::constants::{
|
||||
ALL_MESSAGES_ID, APP_ID, APP_NAME, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT,
|
||||
METADATA_BATCH_TIMEOUT, NEW_MESSAGE_ID, SEARCH_RELAYS,
|
||||
APP_ID, APP_NAME, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT, METADATA_BATCH_TIMEOUT,
|
||||
SEARCH_RELAYS, WAIT_FOR_FINISH,
|
||||
};
|
||||
use global::{nostr_client, set_all_gift_wraps_fetched, NostrSignal};
|
||||
use global::{gift_wrap_sub_id, nostr_client, starting_time, NostrSignal};
|
||||
use gpui::{
|
||||
actions, point, px, size, App, AppContext, Application, Bounds, KeyBinding, Menu, MenuItem,
|
||||
SharedString, TitlebarOptions, WindowBackgroundAppearance, WindowBounds, WindowDecorations,
|
||||
@@ -39,6 +39,9 @@ fn main() {
|
||||
// Initialize the Nostr Client
|
||||
let client = nostr_client();
|
||||
|
||||
// Initialize the starting time
|
||||
let _ = starting_time();
|
||||
|
||||
// Initialize the Application
|
||||
let app = Application::new()
|
||||
.with_assets(Assets)
|
||||
@@ -46,7 +49,7 @@ fn main() {
|
||||
|
||||
let (signal_tx, signal_rx) = channel::bounded::<NostrSignal>(2048);
|
||||
let (mta_tx, mta_rx) = channel::bounded::<PublicKey>(1024);
|
||||
let (event_tx, event_rx) = channel::unbounded::<Event>();
|
||||
let (event_tx, event_rx) = channel::bounded::<Event>(2048);
|
||||
|
||||
let signal_tx_clone = signal_tx.clone();
|
||||
let mta_tx_clone = mta_tx.clone();
|
||||
@@ -131,7 +134,7 @@ fn main() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let duration = smol::Timer::after(Duration::from_secs(30));
|
||||
let duration = smol::Timer::after(Duration::from_secs(WAIT_FOR_FINISH));
|
||||
|
||||
let recv = || async {
|
||||
// no inline
|
||||
@@ -145,8 +148,7 @@ fn main() {
|
||||
|
||||
match smol::future::or(recv(), timeout()).await {
|
||||
Some(event) => {
|
||||
// Process the gift wrap event unwrapping
|
||||
let cached = try_unwrap_event(&signal_tx, &mta_tx, &event, false).await;
|
||||
let cached = try_unwrap_event(&event, &signal_tx, &mta_tx).await;
|
||||
|
||||
// Increment the total messages counter if message is not from cache
|
||||
if !cached {
|
||||
@@ -163,17 +165,9 @@ fn main() {
|
||||
None => {
|
||||
// Notify the UI that the processing is finished
|
||||
signal_tx.send(NostrSignal::Finish).await.ok();
|
||||
// Mark all gift wraps as fetched
|
||||
// For the next time Coop only needs to process new gift wraps
|
||||
set_all_gift_wraps_fetched().await;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Event channel is no longer needed when all gift wrap events have been processed
|
||||
event_rx.close();
|
||||
})
|
||||
.detach();
|
||||
|
||||
@@ -246,8 +240,6 @@ fn main() {
|
||||
|
||||
// Spawn a task to handle events from nostr channel
|
||||
cx.spawn_in(window, async move |_, cx| {
|
||||
let all_messages = SubscriptionId::new(ALL_MESSAGES_ID);
|
||||
|
||||
while let Ok(signal) = signal_rx.recv().await {
|
||||
cx.update(|window, cx| {
|
||||
let registry = Registry::global(cx);
|
||||
@@ -277,8 +269,8 @@ fn main() {
|
||||
}
|
||||
// Load chat rooms without setting as finished
|
||||
NostrSignal::Eose(subscription_id) => {
|
||||
// Only load chat rooms if the subscription ID matches the all_messages_sub_id
|
||||
if subscription_id == all_messages {
|
||||
// Only load chat rooms if the subscription matches the gift wrap subscription
|
||||
if gift_wrap_sub_id() == &subscription_id {
|
||||
registry.update(cx, |this, cx| {
|
||||
this.load_rooms(window, cx);
|
||||
});
|
||||
@@ -369,9 +361,7 @@ async fn handle_nostr_notifications(
|
||||
mta_tx: &Sender<PublicKey>,
|
||||
event_tx: &Sender<Event>,
|
||||
) -> Result<(), Error> {
|
||||
let new_messages = SubscriptionId::new(NEW_MESSAGE_ID);
|
||||
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
|
||||
|
||||
let mut notifications = client.notifications();
|
||||
let mut processed_events: BTreeSet<EventId> = BTreeSet::new();
|
||||
let mut processed_dm_relays: BTreeSet<PublicKey> = BTreeSet::new();
|
||||
@@ -382,10 +372,7 @@ async fn handle_nostr_notifications(
|
||||
};
|
||||
|
||||
match message {
|
||||
RelayMessage::Event {
|
||||
event,
|
||||
subscription_id,
|
||||
} => {
|
||||
RelayMessage::Event { event, .. } => {
|
||||
if processed_events.contains(&event.id) {
|
||||
continue;
|
||||
}
|
||||
@@ -394,14 +381,7 @@ async fn handle_nostr_notifications(
|
||||
|
||||
match event.kind {
|
||||
Kind::GiftWrap => {
|
||||
// Process to unwrap directly if event come from new messages subscription
|
||||
// Otherwise, send the event to the event_tx channel
|
||||
if *subscription_id == new_messages {
|
||||
log::info!("receive a new message: {:?}", event.id);
|
||||
try_unwrap_event(signal_tx, mta_tx, &event, true).await;
|
||||
} else {
|
||||
event_tx.send(event.into_owned()).await.ok();
|
||||
}
|
||||
event_tx.send(event.into_owned()).await.ok();
|
||||
}
|
||||
Kind::Metadata => {
|
||||
signal_tx
|
||||
@@ -428,13 +408,7 @@ async fn handle_nostr_notifications(
|
||||
.kind(Kind::InboxRelays)
|
||||
.limit(1);
|
||||
|
||||
if let Ok(output) = client.subscribe(filter, Some(opts)).await {
|
||||
log::info!(
|
||||
"Subscribed to get DM relays: {} - Relays: {:?}",
|
||||
event.pubkey.to_bech32().unwrap(),
|
||||
output.success
|
||||
)
|
||||
}
|
||||
client.subscribe(filter, Some(opts)).await.ok();
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@@ -517,10 +491,9 @@ async fn get_unwrapped(root: EventId) -> Result<Event, Error> {
|
||||
|
||||
/// Unwraps a gift-wrapped event and processes its contents.
|
||||
async fn try_unwrap_event(
|
||||
event: &Event,
|
||||
signal_tx: &Sender<NostrSignal>,
|
||||
mta_tx: &Sender<PublicKey>,
|
||||
event: &Event,
|
||||
incoming: bool,
|
||||
) -> bool {
|
||||
let client = nostr_client();
|
||||
let mut is_cached = false;
|
||||
@@ -554,16 +527,13 @@ async fn try_unwrap_event(
|
||||
}
|
||||
};
|
||||
|
||||
// Get all pubkeys from the event
|
||||
let all_pubkeys = event.all_pubkeys();
|
||||
|
||||
// Send all pubkeys to the metadata batch to sync data
|
||||
for public_key in all_pubkeys {
|
||||
for public_key in event.all_pubkeys() {
|
||||
mta_tx.send(public_key).await.ok();
|
||||
}
|
||||
|
||||
// Send a notify to GPUI if this is a new message
|
||||
if incoming {
|
||||
if starting_time() <= &event.created_at {
|
||||
signal_tx.send(NostrSignal::GiftWrap(event)).await.ok();
|
||||
}
|
||||
|
||||
|
||||
@@ -42,6 +42,9 @@ use ui::{
|
||||
|
||||
mod subject;
|
||||
|
||||
const DUPLICATE_TIME_WINDOW: u64 = 10;
|
||||
const MAX_RECENT_MESSAGES_TO_CHECK: usize = 5;
|
||||
|
||||
#[derive(Action, Clone, PartialEq, Eq, Deserialize)]
|
||||
#[action(namespace = chat, no_json)]
|
||||
pub struct ChangeSubject(pub String);
|
||||
@@ -131,7 +134,7 @@ impl Chat {
|
||||
RoomSignal::Refresh => {
|
||||
this.load_messages(window, cx);
|
||||
}
|
||||
}
|
||||
};
|
||||
},
|
||||
));
|
||||
|
||||
@@ -236,11 +239,16 @@ impl Chat {
|
||||
return false;
|
||||
}
|
||||
|
||||
let min_timestamp = new_msg.created_at.as_u64().saturating_sub(10);
|
||||
let messages = self.messages.read(cx);
|
||||
let min_timestamp = new_msg
|
||||
.created_at
|
||||
.as_u64()
|
||||
.saturating_sub(DUPLICATE_TIME_WINDOW);
|
||||
|
||||
self.messages
|
||||
.read(cx)
|
||||
messages
|
||||
.iter()
|
||||
.rev()
|
||||
.take(MAX_RECENT_MESSAGES_TO_CHECK)
|
||||
.filter(|m| m.author == identity)
|
||||
.any(|existing| {
|
||||
// Check if messages are within the time window
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, Error};
|
||||
use global::constants::{NEW_MESSAGE_ID, NIP17_RELAYS};
|
||||
use global::constants::{GIFT_WRAP_SUB_ID, NIP17_RELAYS};
|
||||
use global::nostr_client;
|
||||
use gpui::prelude::FluentBuilder;
|
||||
use gpui::{
|
||||
@@ -208,11 +208,8 @@ impl MessagingRelays {
|
||||
_ = client.connect_relay(&relay).await;
|
||||
}
|
||||
|
||||
let id = SubscriptionId::new(NEW_MESSAGE_ID);
|
||||
let new_messages = Filter::new()
|
||||
.kind(Kind::GiftWrap)
|
||||
.pubkey(public_key)
|
||||
.limit(0);
|
||||
let id = SubscriptionId::new(GIFT_WRAP_SUB_ID);
|
||||
let new_messages = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
|
||||
|
||||
// Close old subscriptions
|
||||
client.unsubscribe(&id).await;
|
||||
|
||||
@@ -36,20 +36,21 @@ pub const NOSTR_CONNECT_RELAY: &str = "wss://relay.nsec.app";
|
||||
/// Default timeout (in seconds) for Nostr Connect
|
||||
pub const NOSTR_CONNECT_TIMEOUT: u64 = 200;
|
||||
|
||||
/// Unique ID for new message subscription.
|
||||
pub const NEW_MESSAGE_ID: &str = "listen_new_giftwraps";
|
||||
/// Unique ID for all messages subscription.
|
||||
pub const ALL_MESSAGES_ID: &str = "listen_all_giftwraps";
|
||||
/// Unique ID for all newest messages subscription.
|
||||
pub const ALL_NEWEST_MESSAGES_ID: &str = "listen_all_newest_giftwraps";
|
||||
/// Unique ID for all gift wraps subscription.
|
||||
pub const GIFT_WRAP_SUB_ID: &str = "listen_for_giftwraps";
|
||||
|
||||
/// Total metadata requests will be grouped.
|
||||
pub const METADATA_BATCH_LIMIT: usize = 100;
|
||||
|
||||
/// Maximum timeout for grouping metadata requests.
|
||||
pub const METADATA_BATCH_TIMEOUT: u64 = 400;
|
||||
|
||||
/// Maximum timeout for waiting for finish (seconds)
|
||||
pub const WAIT_FOR_FINISH: u64 = 60;
|
||||
|
||||
/// Default width for all modals.
|
||||
pub const DEFAULT_MODAL_WIDTH: f32 = 420.;
|
||||
|
||||
/// Default width of the sidebar.
|
||||
pub const DEFAULT_SIDEBAR_WIDTH: f32 = 280.;
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use nostr_connect::prelude::*;
|
||||
use nostr_sdk::prelude::*;
|
||||
use paths::nostr_file;
|
||||
|
||||
use crate::constants::GIFT_WRAP_SUB_ID;
|
||||
use crate::paths::support_dir;
|
||||
|
||||
pub mod constants;
|
||||
@@ -33,6 +34,8 @@ pub enum NostrSignal {
|
||||
}
|
||||
|
||||
static NOSTR_CLIENT: OnceLock<Client> = OnceLock::new();
|
||||
static GIFT_WRAP_ID: OnceLock<SubscriptionId> = OnceLock::new();
|
||||
static CURRENT_TIMESTAMP: OnceLock<Timestamp> = OnceLock::new();
|
||||
static FIRST_RUN: OnceLock<bool> = OnceLock::new();
|
||||
|
||||
pub fn nostr_client() -> &'static Client {
|
||||
@@ -61,6 +64,14 @@ pub fn nostr_client() -> &'static Client {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn gift_wrap_sub_id() -> &'static SubscriptionId {
|
||||
GIFT_WRAP_ID.get_or_init(|| SubscriptionId::new(GIFT_WRAP_SUB_ID))
|
||||
}
|
||||
|
||||
pub fn starting_time() -> &'static Timestamp {
|
||||
CURRENT_TIMESTAMP.get_or_init(Timestamp::now)
|
||||
}
|
||||
|
||||
pub fn first_run() -> &'static bool {
|
||||
FIRST_RUN.get_or_init(|| {
|
||||
let flag = support_dir().join(format!(".{}-first_run", env!("CARGO_PKG_VERSION")));
|
||||
@@ -75,16 +86,3 @@ pub fn first_run() -> &'static bool {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn set_all_gift_wraps_fetched() {
|
||||
let flag = support_dir().join(".fetched");
|
||||
|
||||
if !flag.exists() && smol::fs::write(&flag, "").await.is_err() {
|
||||
log::error!("Failed to create full run flag");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_gift_wraps_fetch_complete() -> bool {
|
||||
let flag = support_dir().join(".fetched");
|
||||
flag.exists()
|
||||
}
|
||||
|
||||
@@ -3,11 +3,8 @@ use std::time::Duration;
|
||||
use anyhow::{anyhow, Error};
|
||||
use client_keys::ClientKeys;
|
||||
use common::handle_auth::CoopAuthUrlHandler;
|
||||
use global::constants::{
|
||||
ACCOUNT_D, ALL_MESSAGES_ID, ALL_NEWEST_MESSAGES_ID, NEW_MESSAGE_ID, NIP17_RELAYS, NIP65_RELAYS,
|
||||
NOSTR_CONNECT_TIMEOUT,
|
||||
};
|
||||
use global::{is_gift_wraps_fetch_complete, nostr_client};
|
||||
use global::constants::{ACCOUNT_D, NIP17_RELAYS, NIP65_RELAYS, NOSTR_CONNECT_TIMEOUT};
|
||||
use global::{gift_wrap_sub_id, nostr_client};
|
||||
use gpui::prelude::FluentBuilder;
|
||||
use gpui::{
|
||||
div, red, App, AppContext, Context, Entity, Global, ParentElement, SharedString, Styled,
|
||||
@@ -632,13 +629,15 @@ impl Identity {
|
||||
}
|
||||
|
||||
pub(crate) async fn subscribe(client: &Client, public_key: PublicKey) -> Result<(), Error> {
|
||||
let all_messages = SubscriptionId::new(ALL_MESSAGES_ID);
|
||||
let all_newest_messages = SubscriptionId::new(ALL_NEWEST_MESSAGES_ID);
|
||||
let new_messages = SubscriptionId::new(NEW_MESSAGE_ID);
|
||||
// Subscription options
|
||||
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
|
||||
// Get the gift wraps fetch status
|
||||
let is_completed = is_gift_wraps_fetch_complete();
|
||||
|
||||
client
|
||||
.subscribe_with_id(
|
||||
gift_wrap_sub_id().to_owned(),
|
||||
Filter::new().kind(Kind::GiftWrap).pubkey(public_key),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
client
|
||||
.subscribe(
|
||||
@@ -660,37 +659,6 @@ impl Identity {
|
||||
)
|
||||
.await?;
|
||||
|
||||
client
|
||||
.subscribe_with_id(
|
||||
new_messages,
|
||||
Filter::new()
|
||||
.kind(Kind::GiftWrap)
|
||||
.pubkey(public_key)
|
||||
.limit(0),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if is_completed {
|
||||
let week_ago: u64 = 7 * 24 * 60 * 60;
|
||||
let since = Timestamp::from_secs(Timestamp::now().as_u64() - week_ago);
|
||||
|
||||
let filter = Filter::new()
|
||||
.pubkey(public_key)
|
||||
.kind(Kind::GiftWrap)
|
||||
.since(since);
|
||||
|
||||
client
|
||||
.subscribe_with_id(all_newest_messages, filter, Some(opts))
|
||||
.await?;
|
||||
} else {
|
||||
let filter = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
|
||||
|
||||
client
|
||||
.subscribe_with_id(all_messages, filter, Some(opts))
|
||||
.await?;
|
||||
};
|
||||
|
||||
log::info!("Getting all user's metadata and messages...");
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user