Skip to content

Commit 525c8bf

Browse files
Other messaging improvements (#13750)
1 parent 5c7829c commit 525c8bf

File tree

4 files changed

+118
-93
lines changed

4 files changed

+118
-93
lines changed

packages/twenty-server/src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Injectable } from '@nestjs/common';
1+
import { Injectable, Logger } from '@nestjs/common';
22

33
import { IsNull } from 'typeorm';
44

@@ -10,6 +10,7 @@ import { deleteUsingPagination } from 'src/modules/messaging/message-cleaner/uti
1010

1111
@Injectable()
1212
export class MessagingMessageCleanerService {
13+
private readonly logger = new Logger(MessagingMessageCleanerService.name);
1314
constructor(private readonly twentyORMManager: TwentyORMManager) {}
1415

1516
public async cleanWorkspaceThreads(workspaceId: string) {
@@ -57,6 +58,9 @@ export class MessagingMessageCleanerService {
5758
workspaceId: string,
5859
transactionManager?: WorkspaceEntityManager,
5960
) => {
61+
this.logger.log(
62+
`WorkspaceId: ${workspaceId} Deleting ${ids.length} messages from message cleaner`,
63+
);
6064
await messageRepository.delete(ids, transactionManager);
6165
},
6266
transactionManager,

packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.ts

Lines changed: 98 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,15 @@ export class MessagingMessageListFetchService {
9191
(messageList) => messageList.messageExternalIdsToDelete,
9292
);
9393

94-
const isFullSync = messageLists.every(
95-
(messageList) => !isNonEmptyString(messageList.previousSyncCursor),
96-
);
94+
const isFullSync =
95+
messageLists.every(
96+
(messageList) => !isNonEmptyString(messageList.previousSyncCursor),
97+
) && isNonEmptyString(messageChannel.syncCursor);
9798

9899
let totalMessagesToImportCount = 0;
99100

100101
this.logger.log(
101-
`messageChannelId: ${messageChannel.id} Is full sync: ${isFullSync} and message lists length: ${messageLists.length}`,
102+
`messageChannelId: ${messageChannel.id} Is full sync: ${isFullSync} and toImportCount: ${messageExternalIds.length}, toDeleteCount: ${messageExternalIdsToDelete.length}`,
102103
);
103104

104105
const messageChannelMessageAssociationRepository =
@@ -158,83 +159,12 @@ export class MessagingMessageListFetchService {
158159
);
159160
}
160161

161-
const fullSyncMessageChannelMessageAssociationsToDelete = [];
162-
163-
if (isFullSync) {
164-
const firstMessageChannelMessageAssociation =
165-
await messageChannelMessageAssociationRepository.findOne({
166-
where: {
167-
messageChannelId: messageChannelWithFreshTokens.id,
168-
},
169-
order: {
170-
id: 'ASC',
171-
},
172-
});
173-
174-
if (!isDefined(firstMessageChannelMessageAssociation)) {
175-
this.logger.log(
176-
`messageChannelId: ${messageChannel.id} Full sync: No message channel message associations found`,
177-
);
178-
179-
return;
180-
}
181-
182-
this.logger.log(
183-
`messageChannelId: ${messageChannel.id} Full sync: First message channel message association id: ${firstMessageChannelMessageAssociation.id}`,
184-
);
185-
186-
let nextFirstBatchMessageChannelMessageAssociationId:
187-
| string
188-
| undefined = firstMessageChannelMessageAssociation.id;
189-
let batchIndex = 0;
190-
191-
while (isDefined(nextFirstBatchMessageChannelMessageAssociationId)) {
192-
const existingMessageChannelMessageAssociations =
193-
await messageChannelMessageAssociationRepository.find({
194-
where: {
195-
messageChannelId: messageChannelWithFreshTokens.id,
196-
id: MoreThanOrEqual(
197-
nextFirstBatchMessageChannelMessageAssociationId,
198-
),
199-
},
200-
order: {
201-
id: 'ASC',
202-
},
203-
take: 200,
204-
});
205-
206-
if (existingMessageChannelMessageAssociations.length < 200) {
207-
nextFirstBatchMessageChannelMessageAssociationId = undefined;
208-
break;
209-
}
210-
211-
nextFirstBatchMessageChannelMessageAssociationId =
212-
existingMessageChannelMessageAssociations[
213-
existingMessageChannelMessageAssociations.length - 1
214-
].id;
215-
216-
batchIndex++;
217-
218-
const messageChannelMessageAssociationsToDelete =
219-
existingMessageChannelMessageAssociations.filter(
220-
(existingMessageChannelMessageAssociation) =>
221-
isDefined(
222-
existingMessageChannelMessageAssociation.messageExternalId,
223-
) &&
224-
!messageExternalIds.includes(
225-
existingMessageChannelMessageAssociation.messageExternalId,
226-
),
227-
);
228-
229-
this.logger.log(
230-
`messageChannelId: ${messageChannel.id} Full sync: Message channel message associations to delete in batch ${batchIndex}: ${messageChannelMessageAssociationsToDelete.length}`,
231-
);
232-
233-
fullSyncMessageChannelMessageAssociationsToDelete.push(
234-
...messageChannelMessageAssociationsToDelete,
235-
);
236-
}
237-
}
162+
const fullSyncMessageChannelMessageAssociationsToDelete = isFullSync
163+
? await this.computeFullSyncMessageChannelMessageAssociationsToDelete(
164+
messageChannel,
165+
messageExternalIds,
166+
)
167+
: [];
238168

239169
const allMessageExternalIdsToDelete = [
240170
...messageExternalIdsToDelete,
@@ -308,4 +238,91 @@ export class MessagingMessageListFetchService {
308238
);
309239
}
310240
}
241+
242+
private async computeFullSyncMessageChannelMessageAssociationsToDelete(
243+
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
244+
messageExternalIds: string[],
245+
) {
246+
const messageChannelMessageAssociationRepository =
247+
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
248+
'messageChannelMessageAssociation',
249+
);
250+
251+
const fullSyncMessageChannelMessageAssociationsToDelete = [];
252+
253+
const firstMessageChannelMessageAssociation =
254+
await messageChannelMessageAssociationRepository.findOne({
255+
where: {
256+
messageChannelId: messageChannel.id,
257+
},
258+
order: {
259+
id: 'ASC',
260+
},
261+
});
262+
263+
if (!isDefined(firstMessageChannelMessageAssociation)) {
264+
this.logger.log(
265+
`messageChannelId: ${messageChannel.id} Full sync: No message channel message associations found`,
266+
);
267+
268+
return [];
269+
}
270+
271+
this.logger.log(
272+
`messageChannelId: ${messageChannel.id} Full sync: First message channel message association id: ${firstMessageChannelMessageAssociation.id}`,
273+
);
274+
275+
let nextFirstBatchMessageChannelMessageAssociationId: string | undefined =
276+
firstMessageChannelMessageAssociation.id;
277+
let batchIndex = 0;
278+
279+
while (isDefined(nextFirstBatchMessageChannelMessageAssociationId)) {
280+
const existingMessageChannelMessageAssociations =
281+
await messageChannelMessageAssociationRepository.find({
282+
where: {
283+
messageChannelId: messageChannel.id,
284+
id: MoreThanOrEqual(
285+
nextFirstBatchMessageChannelMessageAssociationId,
286+
),
287+
},
288+
order: {
289+
id: 'ASC',
290+
},
291+
take: 200,
292+
});
293+
294+
const messageChannelMessageAssociationsToDelete =
295+
existingMessageChannelMessageAssociations.filter(
296+
(existingMessageChannelMessageAssociation) =>
297+
isDefined(
298+
existingMessageChannelMessageAssociation.messageExternalId,
299+
) &&
300+
!messageExternalIds.includes(
301+
existingMessageChannelMessageAssociation.messageExternalId,
302+
),
303+
);
304+
305+
this.logger.log(
306+
`messageChannelId: ${messageChannel.id} Full sync: Message channel message associations to delete in batch ${batchIndex}: ${messageChannelMessageAssociationsToDelete.length}`,
307+
);
308+
309+
fullSyncMessageChannelMessageAssociationsToDelete.push(
310+
...messageChannelMessageAssociationsToDelete,
311+
);
312+
313+
if (existingMessageChannelMessageAssociations.length < 200) {
314+
nextFirstBatchMessageChannelMessageAssociationId = undefined;
315+
break;
316+
}
317+
318+
nextFirstBatchMessageChannelMessageAssociationId =
319+
existingMessageChannelMessageAssociations[
320+
existingMessageChannelMessageAssociations.length - 1
321+
].id;
322+
323+
batchIndex++;
324+
}
325+
326+
return fullSyncMessageChannelMessageAssociationsToDelete;
327+
}
311328
}

packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,14 @@ export class MessagingMessagesImportService {
123123
blocklist.map((blocklistItem) => blocklistItem.handle),
124124
);
125125

126-
await this.saveMessagesAndEnqueueContactCreationService.saveMessagesAndEnqueueContactCreation(
127-
messagesToSave,
128-
messageChannel,
129-
connectedAccountWithFreshTokens,
130-
workspaceId,
131-
);
126+
if (messagesToSave.length > 0) {
127+
await this.saveMessagesAndEnqueueContactCreationService.saveMessagesAndEnqueueContactCreation(
128+
messagesToSave,
129+
messageChannel,
130+
connectedAccountWithFreshTokens,
131+
workspaceId,
132+
);
133+
}
132134

133135
if (
134136
messageIdsToFetch.length < MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE

packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,19 @@ export const filterEmails = (
1010
messages: MessageWithParticipants[],
1111
blocklist: string[],
1212
) => {
13+
const messagesWithoutIcsAttachments = filterOutIcsAttachments(messages);
14+
1315
const messagesWithoutBlocklisted = filterOutBlocklistedMessages(
1416
[primaryHandle, ...handleAliases],
15-
filterOutIcsAttachments(messages),
17+
messagesWithoutIcsAttachments,
1618
blocklist,
1719
);
1820

19-
if (isWorkEmail(primaryHandle)) {
20-
return filterOutInternals(primaryHandle, messagesWithoutBlocklisted);
21-
}
21+
const messagesWithoutInternals = isWorkEmail(primaryHandle)
22+
? filterOutInternals(primaryHandle, messagesWithoutBlocklisted)
23+
: messagesWithoutBlocklisted;
2224

23-
return messagesWithoutBlocklisted;
25+
return messagesWithoutInternals;
2426
};
2527

2628
const filterOutBlocklistedMessages = (

0 commit comments

Comments
 (0)