Store DM rumors without re-signing (#192)

Co-authored-by: alltheseas <alltheseas@users.noreply.github.com>
This commit is contained in:
alltheseas
2025-10-24 19:39:02 -05:00
committed by GitHub
parent 47abd2909b
commit 48c90f5bb0
4 changed files with 119 additions and 116 deletions

View File

@@ -166,7 +166,7 @@ impl Chat {
match signal {
RoomSignal::NewMessage((gift_wrap_id, event)) => {
let gift_wrap_id = gift_wrap_id.to_owned();
let message = Message::user(event);
let message = Message::user(event.clone());
cx.spawn_in(window, async move |this, cx| {
let states = app_state();
@@ -423,8 +423,8 @@ impl Chat {
}
/// Convert and insert a vector of nostr events into the chat panel
fn insert_messages(&mut self, events: Vec<Event>, cx: &mut Context<Self>) {
for event in events.into_iter() {
fn insert_messages(&mut self, events: Vec<UnsignedEvent>, cx: &mut Context<Self>) {
for event in events {
let m = Message::user(event);
// Bulk inserting messages, so no need to scroll to the latest message
self.insert_message(m, false, cx);

View File

@@ -9,7 +9,6 @@ use fuzzy_matcher::FuzzyMatcher;
use gpui::{
App, AppContext, AsyncApp, Context, Entity, EventEmitter, Global, Task, WeakEntity, Window,
};
use itertools::Itertools;
use nostr_sdk::prelude::*;
use room::RoomKind;
use settings::AppSettings;
@@ -338,63 +337,64 @@ impl Registry {
let public_key = signer.get_public_key().await?;
let contacts = client.database().contacts_public_keys(public_key).await?;
// Get messages sent by the user
let sent = Filter::new()
.kind(Kind::PrivateDirectMessage)
.author(public_key);
let authored_filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.custom_tag(
SingleLetterTag::lowercase(Alphabet::A),
public_key.to_hex(),
);
// Get messages received by the user
let recv = Filter::new()
.kind(Kind::PrivateDirectMessage)
.pubkey(public_key);
let addressed_filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.custom_tag(SingleLetterTag::lowercase(Alphabet::P), public_key);
let sent_events = client.database().query(sent).await?;
let recv_events = client.database().query(recv).await?;
let events = sent_events.merge(recv_events);
let authored = client.database().query(authored_filter).await?;
let addressed = client.database().query(addressed_filter).await?;
let events = authored.merge(addressed);
let mut rooms: HashSet<Room> = HashSet::new();
let mut grouped: HashMap<u64, Vec<UnsignedEvent>> = HashMap::new();
// Process each event and group by room hash
for event in events
.into_iter()
.sorted_by_key(|event| Reverse(event.created_at))
.filter(|ev| ev.tags.public_keys().peekable().peek().is_some())
{
// Parse the room from the nostr event
let room = Room::from(&event);
for raw in events.into_iter() {
match UnsignedEvent::from_json(&raw.content) {
Ok(rumor) => {
if rumor.tags.public_keys().peekable().peek().is_some() {
grouped.entry(rumor.uniq_id()).or_default().push(rumor);
}
}
Err(e) => log::warn!("Failed to parse stored rumor: {e}"),
}
}
for (_room_id, mut messages) in grouped.into_iter() {
messages.sort_by_key(|m| Reverse(m.created_at));
let Some(latest) = messages.first() else {
continue;
};
let mut room = Room::from(latest);
// Skip if the room is already in the set
if rooms.iter().any(|r| r.id == room.id) {
continue;
}
// Get all public keys from the event's tags
let mut public_keys: Vec<PublicKey> = room.members().to_vec();
public_keys.retain(|pk| pk != &public_key);
// Bypass screening flag
let mut bypassed = false;
let user_sent = messages.iter().any(|m| m.pubkey == public_key);
// If the user has enabled bypass screening in settings,
// check if any of the room's members are contacts of the current user
let mut bypassed = false;
if bypass_setting {
bypassed = public_keys.iter().any(|k| contacts.contains(k));
}
// Check if the current user has sent at least one message to this room
let filter = Filter::new()
.kind(Kind::PrivateDirectMessage)
.author(public_key)
.pubkeys(public_keys);
// If current user has sent a message at least once, mark as ongoing
let is_ongoing = client.database().count(filter).await.unwrap_or(1) >= 1;
if is_ongoing || bypassed {
rooms.insert(room.kind(RoomKind::Ongoing));
} else {
rooms.insert(room);
if user_sent || bypassed {
room = room.kind(RoomKind::Ongoing);
}
rooms.insert(room);
}
Ok(rooms)
@@ -482,7 +482,7 @@ impl Registry {
pub fn event_to_message(
&mut self,
gift_wrap: EventId,
event: Event,
event: UnsignedEvent,
window: &mut Window,
cx: &mut Context<Self>,
) {
@@ -495,11 +495,13 @@ impl Registry {
if let Some(room) = self.rooms.iter().find(|room| room.read(cx).id == id) {
let is_new_event = event.created_at > room.read(cx).created_at;
let created_at = event.created_at;
let event_for_emit = event.clone();
// Update room
room.update(cx, |this, cx| {
if is_new_event {
this.set_created_at(event.created_at, cx);
this.set_created_at(created_at, cx);
}
// Set this room is ongoing if the new message is from current user
@@ -508,8 +510,9 @@ impl Registry {
}
// Emit the new message to the room
let event_to_emit = event_for_emit.clone();
cx.defer_in(window, move |this, _window, cx| {
this.emit_message(gift_wrap, event, cx);
this.emit_message(gift_wrap, event_to_emit, cx);
});
});

View File

@@ -71,7 +71,7 @@ impl SendReport {
#[derive(Debug, Clone)]
pub enum RoomSignal {
NewMessage((EventId, Box<Event>)),
NewMessage((EventId, UnsignedEvent)),
Refresh,
}
@@ -359,65 +359,37 @@ impl Room {
}
/// Loads all messages for this room from the database
pub fn load_messages(&self, cx: &App) -> Task<Result<Vec<Event>, Error>> {
let members = self.members.clone();
pub fn load_messages(&self, cx: &App) -> Task<Result<Vec<UnsignedEvent>, Error>> {
let conversation_id = self.id.to_string();
cx.background_spawn(async move {
let client = app_state().client();
let signer = client.signer().await?;
let public_key = signer.get_public_key().await?;
let sent_ids: Vec<EventId> = app_state()
.tracker()
.read()
.await
.sent_ids()
.iter()
.copied()
.collect();
// Get seen events from database
let filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.identifiers(sent_ids);
.custom_tag(
SingleLetterTag::lowercase(Alphabet::C),
conversation_id.as_str(),
);
let seen_events = client.database().query(filter).await?;
let stored = client.database().query(filter).await?;
let mut messages = Vec::with_capacity(stored.len());
// Extract seen event IDs
let seen_ids: Vec<EventId> = seen_events
.into_iter()
.filter_map(|event| event.tags.event_ids().next().copied())
.collect();
for event in stored {
match UnsignedEvent::from_json(&event.content) {
Ok(rumor) => messages.push(rumor),
Err(e) => log::warn!("Failed to parse stored rumor: {e}"),
}
}
// Get events that sent by current user
let filter = Filter::new()
.kind(Kind::PrivateDirectMessage)
.author(public_key)
.pubkeys(members.clone());
messages.sort_by_key(|message| message.created_at);
let sent_events = client.database().query(filter).await?;
// Get events that received by current user
let filter = Filter::new()
.kind(Kind::PrivateDirectMessage)
.authors(members)
.pubkey(public_key);
let recv_events = client.database().query(filter).await?;
// Merge events
let events: Vec<Event> = sent_events
.merge(recv_events)
.into_iter()
.filter(|event| !seen_ids.contains(&event.id))
.collect();
Ok(events)
Ok(messages)
})
}
/// Emits a new message signal to the current room
pub fn emit_message(&self, gift_wrap_id: EventId, event: Event, cx: &mut Context<Self>) {
cx.emit(RoomSignal::NewMessage((gift_wrap_id, Box::new(event))));
pub fn emit_message(&self, gift_wrap_id: EventId, event: UnsignedEvent, cx: &mut Context<Self>) {
cx.emit(RoomSignal::NewMessage((gift_wrap_id, event)));
}
/// Emits a signal to refresh the current room's messages.

View File

@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
@@ -57,7 +58,7 @@ pub enum SignalKind {
NewProfile(Profile),
/// A signal to notify UI that a new gift wrap event has been received
NewMessage((EventId, Event)),
NewMessage((EventId, UnsignedEvent)),
/// A signal to notify UI that no messaging relays for current user was found
MessagingRelaysNotFound,
@@ -685,37 +686,53 @@ impl AppState {
}
/// Stores an unwrapped event in local database with reference to original
async fn set_rumor(&self, id: EventId, rumor: &Event) -> Result<(), Error> {
// Save unwrapped event
self.client.database().save_event(rumor).await?;
async fn set_rumor(&self, id: EventId, rumor: &UnsignedEvent) -> Result<(), Error> {
let rumor_id = rumor
.id
.ok_or_else(|| anyhow!("Rumor is missing an event id"))?;
let author_hex = rumor.pubkey.to_hex();
let conversation = Self::conversation_id(rumor).to_string();
// Create a reference event pointing to the unwrapped event
let event = EventBuilder::new(Kind::ApplicationSpecificData, "")
.tags(vec![Tag::identifier(id), Tag::event(rumor.id)])
let mut tags = rumor.tags.clone().to_vec();
tags.push(Tag::identifier(id));
tags.push(Tag::custom(
TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::A)),
[author_hex],
));
tags.push(Tag::custom(
TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::C)),
[conversation],
));
tags.push(Tag::event(rumor_id));
for receiver in rumor.tags.public_keys().copied() {
tags.push(Tag::custom(
TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::P)),
[receiver.to_hex()],
));
}
let content = rumor.as_json();
let event = EventBuilder::new(Kind::ApplicationSpecificData, content)
.tags(tags)
.sign(&Keys::generate())
.await?;
// Save reference event
self.client.database().save_event(&event).await?;
Ok(())
}
/// Retrieves a previously unwrapped event from local database
async fn get_rumor(&self, id: EventId) -> Result<Event, Error> {
async fn get_rumor(&self, id: EventId) -> Result<UnsignedEvent, Error> {
let filter = Filter::new()
.kind(Kind::ApplicationSpecificData)
.identifier(id)
.limit(1);
if let Some(event) = self.client.database().query(filter).await?.first_owned() {
let target_id = event.tags.event_ids().collect::<Vec<_>>()[0];
if let Some(event) = self.client.database().event_by_id(target_id).await? {
Ok(event)
} else {
Err(anyhow!("Event not found."))
}
UnsignedEvent::from_json(event.content).map_err(|e| anyhow!(e))
} else {
Err(anyhow!("Event is not cached yet."))
}
@@ -723,13 +740,13 @@ impl AppState {
// Unwraps a gift-wrapped event and processes its contents.
async fn extract_rumor(&self, gift_wrap: &Event) {
let mut rumor: Option<Event> = None;
let mut rumor: Option<UnsignedEvent> = None;
if let Ok(event) = self.get_rumor(gift_wrap.id).await {
rumor = Some(event);
} else if let Ok(unwrapped) = self.client.unwrap_gift_wrap(gift_wrap).await {
let sender = unwrapped.sender;
let rumor_unsigned = unwrapped.rumor;
let mut rumor_unsigned = unwrapped.rumor;
if !Self::verify_rumor_sender(sender, &rumor_unsigned) {
log::warn!(
@@ -738,13 +755,14 @@ impl AppState {
sender,
rumor_unsigned.pubkey
);
} else if let Ok(event) = rumor_unsigned.clone().sign_with_keys(&Keys::generate()) {
// Save this event to the database for future use.
if let Err(e) = self.set_rumor(gift_wrap.id, &event).await {
log::warn!("Failed to cache unwrapped event: {e}")
}
} else {
rumor_unsigned.ensure_id();
rumor = Some(event);
if let Err(e) = self.set_rumor(gift_wrap.id, &rumor_unsigned).await {
log::warn!("Failed to cache unwrapped event: {e}")
} else {
rumor = Some(rumor_unsigned);
}
}
}
@@ -769,6 +787,16 @@ impl AppState {
}
}
fn conversation_id(rumor: &UnsignedEvent) -> u64 {
let mut hasher = DefaultHasher::new();
let mut pubkeys: Vec<PublicKey> = rumor.tags.public_keys().copied().collect();
pubkeys.push(rumor.pubkey);
pubkeys.sort();
pubkeys.dedup();
pubkeys.hash(&mut hasher);
hasher.finish()
}
fn verify_rumor_sender(sender: PublicKey, rumor: &UnsignedEvent) -> bool {
rumor.pubkey == sender
}