feat: improve negentropy sync
This commit is contained in:
@@ -10,12 +10,8 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
use tauri::{Emitter, Manager, State};
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::{
|
||||
common::{get_latest_event, init_nip65},
|
||||
Nostr, NOTIFICATION_SUB_ID,
|
||||
};
|
||||
use crate::{Nostr, NOTIFICATION_SUB_ID};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
|
||||
struct Account {
|
||||
@@ -284,7 +280,7 @@ pub async fn login(
|
||||
};
|
||||
|
||||
// NIP-65: Connect to user's relay list
|
||||
init_nip65(client, &public_key).await;
|
||||
// init_nip65(client, &public_key).await;
|
||||
|
||||
// NIP-03: Get user's contact list
|
||||
let contact_list = {
|
||||
@@ -325,8 +321,8 @@ pub async fn login(
|
||||
.reconcile(
|
||||
Filter::new()
|
||||
.authors(authors.clone())
|
||||
.kinds(vec![Kind::Metadata, Kind::ContactList, Kind::EventDeletion])
|
||||
.limit(authors.len() * 20),
|
||||
.kinds(vec![Kind::Metadata, Kind::ContactList])
|
||||
.limit(authors.len() * 10),
|
||||
NegentropyOptions::default(),
|
||||
)
|
||||
.await
|
||||
@@ -339,8 +335,8 @@ pub async fn login(
|
||||
.reconcile(
|
||||
Filter::new()
|
||||
.authors(authors.clone())
|
||||
.kinds(vec![Kind::TextNote, Kind::Repost])
|
||||
.limit(authors.len() * 50),
|
||||
.kinds(vec![Kind::TextNote, Kind::Repost, Kind::EventDeletion])
|
||||
.limit(authors.len() * 40),
|
||||
NegentropyOptions::default(),
|
||||
)
|
||||
.await
|
||||
@@ -350,84 +346,94 @@ pub async fn login(
|
||||
|
||||
// Create the trusted public key list from contact list
|
||||
// TODO: create a cached file
|
||||
let mut trusted_list: HashSet<PublicKey> = HashSet::new();
|
||||
if let Ok(events) = client
|
||||
.database()
|
||||
.query(vec![Filter::new().kind(Kind::ContactList)])
|
||||
.await
|
||||
{
|
||||
let keys: Vec<&str> = events
|
||||
.iter()
|
||||
.flat_map(|event| event.get_tags_content(TagKind::p()))
|
||||
.collect();
|
||||
|
||||
for author in authors.into_iter() {
|
||||
trusted_list.insert(author);
|
||||
|
||||
let filter = Filter::new()
|
||||
.author(author)
|
||||
.kind(Kind::ContactList)
|
||||
.limit(1);
|
||||
|
||||
if let Ok(events) = client.database().query(vec![filter]).await {
|
||||
if let Some(event) = get_latest_event(&events) {
|
||||
for tag in event.tags.iter() {
|
||||
if let Some(TagStandard::PublicKey {
|
||||
public_key,
|
||||
uppercase: false,
|
||||
..
|
||||
}) = tag.to_owned().to_standardized()
|
||||
{
|
||||
trusted_list.insert(public_key);
|
||||
};
|
||||
let trusted_list: HashSet<PublicKey> = keys
|
||||
.into_iter()
|
||||
.filter_map(|item| {
|
||||
if let Ok(pk) = PublicKey::from_str(item) {
|
||||
Some(pk)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Update app's state
|
||||
state.trusted_list.lock().await.clone_from(&trusted_list);
|
||||
|
||||
let trusted_users: Vec<PublicKey> = trusted_list.into_iter().collect();
|
||||
println!("Total trusted users: {}", trusted_users.len());
|
||||
|
||||
if let Ok(report) = client
|
||||
.reconcile(
|
||||
Filter::new()
|
||||
.authors(trusted_users)
|
||||
.kinds(vec![
|
||||
Kind::Metadata,
|
||||
Kind::TextNote,
|
||||
Kind::Repost,
|
||||
Kind::EventDeletion,
|
||||
])
|
||||
.limit(5000),
|
||||
NegentropyOptions::default(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
println!("Received: {}", report.received.len())
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
// Update app's state
|
||||
state.trusted_list.lock().await.clone_from(&trusted_list);
|
||||
|
||||
// Syncing all user's events
|
||||
if let Ok(report) = client
|
||||
.reconcile(Filter::new().author(author), NegentropyOptions::default())
|
||||
.await
|
||||
{
|
||||
println!("Received: {}", report.received.len())
|
||||
}
|
||||
|
||||
// Syncing all tagged events for current user
|
||||
if let Ok(report) = client
|
||||
.reconcile(
|
||||
Filter::new().pubkey(author).kinds(vec![
|
||||
Kind::TextNote,
|
||||
Kind::Repost,
|
||||
Kind::Reaction,
|
||||
Kind::ZapReceipt,
|
||||
]),
|
||||
NegentropyOptions::default(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
println!("Received: {}", report.received.len())
|
||||
}
|
||||
|
||||
// Syncing all events for trusted list
|
||||
let trusted: Vec<PublicKey> = trusted_list.into_iter().collect();
|
||||
if let Ok(report) = client
|
||||
.reconcile(
|
||||
Filter::new()
|
||||
.authors(trusted)
|
||||
.kinds(vec![
|
||||
Kind::Metadata,
|
||||
Kind::TextNote,
|
||||
Kind::Repost,
|
||||
Kind::EventDeletion,
|
||||
])
|
||||
.limit(30000),
|
||||
NegentropyOptions::default(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
println!("Received: {}", report.received.len())
|
||||
}
|
||||
|
||||
// Wait a little longer
|
||||
// TODO: remove?
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
// Syncing all user's events
|
||||
if let Ok(report) = client
|
||||
.reconcile(
|
||||
Filter::new().author(author).kinds(vec![
|
||||
Kind::TextNote,
|
||||
Kind::Repost,
|
||||
Kind::FollowSet,
|
||||
Kind::InterestSet,
|
||||
Kind::Interests,
|
||||
Kind::EventDeletion,
|
||||
Kind::MuteList,
|
||||
Kind::BookmarkSet,
|
||||
Kind::BlockedRelays,
|
||||
Kind::EmojiSet,
|
||||
Kind::RelaySet,
|
||||
Kind::RelayList,
|
||||
Kind::ApplicationSpecificData,
|
||||
]),
|
||||
NegentropyOptions::default(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
println!("Received: {}", report.received.len())
|
||||
}
|
||||
|
||||
// Syncing all tagged events for current user
|
||||
if let Ok(report) = client
|
||||
.reconcile(
|
||||
Filter::new().pubkey(author).kinds(vec![
|
||||
Kind::TextNote,
|
||||
Kind::Repost,
|
||||
Kind::Reaction,
|
||||
Kind::ZapReceipt,
|
||||
]),
|
||||
NegentropyOptions::default(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
println!("Received: {}", report.received.len())
|
||||
};
|
||||
|
||||
handle
|
||||
.emit("neg_synchronized", ())
|
||||
.expect("Something wrong!");
|
||||
|
||||
@@ -40,25 +40,26 @@ pub async fn get_event(id: String, state: State<'_, Nostr>) -> Result<RichEvent,
|
||||
|
||||
Ok(RichEvent { raw, parsed })
|
||||
} else {
|
||||
println!("Not found, getting event from relays...");
|
||||
match client
|
||||
.stream_events_of(vec![filter], Some(Duration::from_secs(10)))
|
||||
.get_events_of(
|
||||
vec![filter],
|
||||
EventSource::relays(Some(Duration::from_secs(10))),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(mut rx) => {
|
||||
let mut raw: String = String::new();
|
||||
let mut parsed: Option<Meta> = None;
|
||||
|
||||
while let Some(event) = rx.next().await {
|
||||
raw = event.as_json();
|
||||
parsed = if event.kind == Kind::TextNote {
|
||||
Ok(events) => {
|
||||
if let Some(event) = get_latest_event(&events) {
|
||||
let raw = event.as_json();
|
||||
let parsed = if event.kind == Kind::TextNote {
|
||||
Some(parse_event(&event.content).await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
|
||||
Ok(RichEvent { raw, parsed })
|
||||
Ok(RichEvent { raw, parsed })
|
||||
} else {
|
||||
Err("Not found.".into())
|
||||
}
|
||||
}
|
||||
Err(err) => Err(err.to_string()),
|
||||
}
|
||||
|
||||
@@ -58,23 +58,23 @@ pub async fn get_profile(id: Option<String>, state: State<'_, Nostr>) -> Result<
|
||||
Err("Parse metadata failed".into())
|
||||
}
|
||||
} else {
|
||||
println!("Not found, getting event from relays...");
|
||||
match client
|
||||
.stream_events_of(vec![filter], Some(Duration::from_secs(10)))
|
||||
.get_events_of(
|
||||
vec![filter],
|
||||
EventSource::relays(Some(Duration::from_secs(10))),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(mut rx) => {
|
||||
let mut metadata: String = Metadata::new().as_json();
|
||||
|
||||
while let Some(event) = rx.next().await {
|
||||
println!("Event: {}", event.as_json());
|
||||
if let Ok(m) = Metadata::from_json(&event.content) {
|
||||
metadata = m.as_json();
|
||||
break;
|
||||
Ok(events) => {
|
||||
if let Some(event) = get_latest_event(&events) {
|
||||
if let Ok(metadata) = Metadata::from_json(&event.content) {
|
||||
Ok(metadata.as_json())
|
||||
} else {
|
||||
Err("Metadata is not valid.".into())
|
||||
}
|
||||
} else {
|
||||
Err("Not found.".into())
|
||||
}
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
Err(e) => Err(e.to_string()),
|
||||
}
|
||||
|
||||
@@ -273,8 +273,12 @@ fn main() {
|
||||
println!("Add discovery relay failed: {}", e)
|
||||
}
|
||||
|
||||
if let Err(e) = client.add_discovery_relay("wss://user.kindpag.es/").await {
|
||||
println!("Add discovery relay failed: {}", e)
|
||||
}
|
||||
|
||||
// Connect
|
||||
client.connect_with_timeout(Duration::from_secs(20)).await;
|
||||
client.connect_with_timeout(Duration::from_secs(10)).await;
|
||||
|
||||
client
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user