refactor person registry
This commit is contained in:
@@ -11,7 +11,7 @@ state = { path = "../state" }
|
||||
gpui.workspace = true
|
||||
nostr-sdk.workspace = true
|
||||
anyhow.workspace = true
|
||||
itertools.workspace = true
|
||||
smallvec.workspace = true
|
||||
smol.workspace = true
|
||||
flume.workspace = true
|
||||
log.workspace = true
|
||||
|
||||
@@ -17,64 +17,50 @@ impl Global for GlobalPersonRegistry {}
|
||||
#[derive(Debug)]
|
||||
pub struct PersonRegistry {
|
||||
/// Collection of all persons (user profiles)
|
||||
pub persons: HashMap<PublicKey, Entity<Profile>>,
|
||||
persons: HashMap<PublicKey, Entity<Profile>>,
|
||||
|
||||
/// Tasks for asynchronous operations
|
||||
_tasks: SmallVec<[Task<()>; 2]>,
|
||||
_tasks: SmallVec<[Task<()>; 3]>,
|
||||
}
|
||||
|
||||
impl PersonRegistry {
|
||||
/// Retrieve the global person registry state
|
||||
/// Retrieve the global person registry
|
||||
pub fn global(cx: &App) -> Entity<Self> {
|
||||
cx.global::<GlobalPersonRegistry>().0.clone()
|
||||
}
|
||||
|
||||
/// Set the global person registry instance
|
||||
pub(crate) fn set_global(state: Entity<Self>, cx: &mut App) {
|
||||
fn set_global(state: Entity<Self>, cx: &mut App) {
|
||||
cx.set_global(GlobalPersonRegistry(state));
|
||||
}
|
||||
|
||||
/// Create a new person registry instance
|
||||
pub(crate) fn new(cx: &mut Context<Self>) -> Self {
|
||||
fn new(cx: &mut Context<Self>) -> Self {
|
||||
let nostr = NostrRegistry::global(cx);
|
||||
let client = nostr.read(cx).client();
|
||||
|
||||
// Channel for communication between nostr and gpui
|
||||
let (tx, rx) = flume::bounded::<Profile>(100);
|
||||
|
||||
let mut tasks = smallvec![];
|
||||
|
||||
tasks.push(
|
||||
// Handle notifications
|
||||
cx.spawn({
|
||||
let client = nostr.read(cx).client();
|
||||
// Handle nostr notifications
|
||||
cx.background_spawn({
|
||||
let client = client.clone();
|
||||
|
||||
async move |this, cx| {
|
||||
let mut notifications = client.notifications();
|
||||
let mut processed_events = HashSet::new();
|
||||
async move { Self::handle_notifications(&client, &tx).await }
|
||||
}),
|
||||
);
|
||||
|
||||
while let Ok(notification) = notifications.recv().await {
|
||||
let RelayPoolNotification::Message { message, .. } = notification else {
|
||||
// Skip if the notification is not a message
|
||||
continue;
|
||||
};
|
||||
|
||||
if let RelayMessage::Event { event, .. } = message {
|
||||
if !processed_events.insert(event.id) {
|
||||
// Skip if the event has already been processed
|
||||
continue;
|
||||
}
|
||||
|
||||
if event.kind != Kind::Metadata {
|
||||
// Skip if the event is not a metadata event
|
||||
continue;
|
||||
};
|
||||
|
||||
let metadata = Metadata::from_json(&event.content).unwrap_or_default();
|
||||
let profile = Profile::new(event.pubkey, metadata);
|
||||
|
||||
this.update(cx, |this, cx| {
|
||||
this.insert_or_update_person(profile, cx);
|
||||
})
|
||||
.expect("Entity has been released")
|
||||
}
|
||||
}
|
||||
tasks.push(
|
||||
// Update GPUI state
|
||||
cx.spawn(async move |this, cx| {
|
||||
while let Ok(profile) = rx.recv_async().await {
|
||||
this.update(cx, |this, cx| {
|
||||
this.insert(profile, cx);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}),
|
||||
);
|
||||
@@ -83,18 +69,19 @@ impl PersonRegistry {
|
||||
// Load all user profiles from the database
|
||||
cx.spawn(async move |this, cx| {
|
||||
let result = cx
|
||||
.background_spawn(async move { Self::load_persons(&client).await })
|
||||
.background_executor()
|
||||
.await_on_background(async move { Self::load_persons(&client).await })
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(profiles) => {
|
||||
this.update(cx, |this, cx| {
|
||||
this.bulk_insert_persons(profiles, cx);
|
||||
this.bulk_inserts(profiles, cx);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to load persons: {e}");
|
||||
log::error!("Failed to load all persons from the database: {e}");
|
||||
}
|
||||
};
|
||||
}),
|
||||
@@ -106,6 +93,34 @@ impl PersonRegistry {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle nostr notifications
|
||||
async fn handle_notifications(client: &Client, tx: &flume::Sender<Profile>) {
|
||||
let mut notifications = client.notifications();
|
||||
let mut processed_events = HashSet::new();
|
||||
|
||||
while let Ok(notification) = notifications.recv().await {
|
||||
let RelayPoolNotification::Message { message, .. } = notification else {
|
||||
// Skip if the notification is not a message
|
||||
continue;
|
||||
};
|
||||
|
||||
if let RelayMessage::Event { event, .. } = message {
|
||||
if !processed_events.insert(event.id) {
|
||||
// Skip if the event has already been processed
|
||||
continue;
|
||||
}
|
||||
|
||||
// Only process metadata events
|
||||
if event.kind == Kind::Metadata {
|
||||
let metadata = Metadata::from_json(&event.content).unwrap_or_default();
|
||||
let profile = Profile::new(event.pubkey, metadata);
|
||||
|
||||
tx.send_async(profile).await.ok();
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Load all user profiles from the database
|
||||
async fn load_persons(client: &Client) -> Result<Vec<Profile>, Error> {
|
||||
let filter = Filter::new().kind(Kind::Metadata).limit(200);
|
||||
@@ -123,7 +138,7 @@ impl PersonRegistry {
|
||||
}
|
||||
|
||||
/// Insert batch of persons
|
||||
fn bulk_insert_persons(&mut self, profiles: Vec<Profile>, cx: &mut Context<Self>) {
|
||||
fn bulk_inserts(&mut self, profiles: Vec<Profile>, cx: &mut Context<Self>) {
|
||||
for profile in profiles.into_iter() {
|
||||
self.persons
|
||||
.insert(profile.public_key(), cx.new(|_| profile));
|
||||
@@ -132,7 +147,7 @@ impl PersonRegistry {
|
||||
}
|
||||
|
||||
/// Insert or update a person
|
||||
pub fn insert_or_update_person(&mut self, profile: Profile, cx: &mut App) {
|
||||
pub fn insert(&mut self, profile: Profile, cx: &mut App) {
|
||||
let public_key = profile.public_key();
|
||||
|
||||
match self.persons.get(&public_key) {
|
||||
@@ -148,24 +163,12 @@ impl PersonRegistry {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get single person
|
||||
pub fn get_person(&self, public_key: &PublicKey, cx: &App) -> Profile {
|
||||
/// Get single person by public key
|
||||
pub fn get(&self, public_key: &PublicKey, cx: &App) -> Profile {
|
||||
self.persons
|
||||
.get(public_key)
|
||||
.map(|e| e.read(cx))
|
||||
.cloned()
|
||||
.unwrap_or(Profile::new(public_key.to_owned(), Metadata::default()))
|
||||
}
|
||||
|
||||
/// Get group of persons
|
||||
pub fn get_group_person(&self, public_keys: &[PublicKey], cx: &App) -> Vec<Profile> {
|
||||
let mut profiles = vec![];
|
||||
|
||||
for public_key in public_keys.iter() {
|
||||
let profile = self.get_person(public_key, cx);
|
||||
profiles.push(profile);
|
||||
}
|
||||
|
||||
profiles
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user