-
Notifications
You must be signed in to change notification settings - Fork 382
feat(llc, core, persistence): Add support for PredefinedFilters on QueryChannels
#2709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v10.0.0
Are you sure you want to change the base?
Changes from all commits
ff5c6ce
5dc6bd4
cfd8283
852cd16
6e2c1c2
c8fd41e
33bc631
420ecc9
7ebb1aa
68cde5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ import 'package:rxdart/rxdart.dart'; | |
| import 'package:stream_chat/src/client/channel.dart'; | ||
| import 'package:stream_chat/src/client/channel_delivery_reporter.dart'; | ||
| import 'package:stream_chat/src/client/event_resolvers.dart' as event_resolvers; | ||
| import 'package:stream_chat/src/client/query_channels_result.dart'; | ||
| import 'package:stream_chat/src/client/retry_policy.dart'; | ||
| import 'package:stream_chat/src/core/api/attachment_file_uploader.dart'; | ||
| import 'package:stream_chat/src/core/api/requests.dart'; | ||
|
|
@@ -46,6 +47,7 @@ import 'package:stream_chat/src/core/util/extension.dart'; | |
| import 'package:stream_chat/src/core/util/immutable_collection_subjects.dart'; | ||
| import 'package:stream_chat/src/core/util/in_flight_cache.dart'; | ||
| import 'package:stream_chat/src/core/util/list_extensions.dart'; | ||
| import 'package:stream_chat/src/core/util/predefined_filter_defaults.dart'; | ||
| import 'package:stream_chat/src/core/util/utils.dart'; | ||
| import 'package:stream_chat/src/db/chat_persistence_client.dart'; | ||
| import 'package:stream_chat/src/event_type.dart'; | ||
|
|
@@ -643,12 +645,89 @@ class StreamChatClient { | |
| }); | ||
| } | ||
|
|
||
| final _queryChannelsCache = InFlightCache<String, List<Channel>>(); | ||
| final _queryChannelsCache = InFlightCache<String, QueryChannelsResult>(); | ||
|
|
||
| /// Requests channels with a given query. | ||
| /// | ||
| /// Either an inline [filter]/[channelStateSort] pair or a [predefinedFilter] | ||
| /// identifier (optionally interpolated with [filterValues] and [sortValues]) | ||
| /// can be supplied. | ||
| /// | ||
| /// Use [queryChannelsWithResult] if you also need the server-resolved | ||
| /// [PredefinedFilter] spec. | ||
| Stream<List<Channel>> queryChannels({ | ||
| Filter? filter, | ||
| SortOrder<ChannelState>? channelStateSort, | ||
| String? predefinedFilter, | ||
| Map<String, Object?>? filterValues, | ||
| Map<String, Object?>? sortValues, | ||
| bool state = true, | ||
| bool watch = true, | ||
| bool presence = false, | ||
| int? memberLimit, | ||
| int? messageLimit, | ||
| PaginationParams paginationParams = const PaginationParams(), | ||
| bool waitForConnect = true, | ||
| }) async* { | ||
| await for (final result in _queryChannelsImpl( | ||
| filter: filter, | ||
| channelStateSort: channelStateSort, | ||
| predefinedFilter: predefinedFilter, | ||
| filterValues: filterValues, | ||
| sortValues: sortValues, | ||
| state: state, | ||
| watch: watch, | ||
| presence: presence, | ||
| memberLimit: memberLimit, | ||
| messageLimit: messageLimit, | ||
| paginationParams: paginationParams, | ||
| waitForConnect: waitForConnect, | ||
| )) { | ||
| yield result.channels; | ||
| } | ||
| } | ||
|
|
||
| /// Requests channels with a given query, yielding a [QueryChannelsResult] | ||
| /// that carries both the live channel list and the server-resolved | ||
| /// [PredefinedFilter] spec (when one is associated with the query). | ||
| /// | ||
| /// Yields the offline-cached result first (when available), followed by | ||
| /// the online result. Concurrent identical online queries are coalesced | ||
| /// via [_queryChannelsCache]. | ||
| Stream<QueryChannelsResult> queryChannelsWithResult({ | ||
| Filter? filter, | ||
| SortOrder<ChannelState>? channelStateSort, | ||
| String? predefinedFilter, | ||
| Map<String, Object?>? filterValues, | ||
| Map<String, Object?>? sortValues, | ||
| bool state = true, | ||
| bool watch = true, | ||
| bool presence = false, | ||
| int? memberLimit, | ||
| int? messageLimit, | ||
| PaginationParams paginationParams = const PaginationParams(), | ||
| bool waitForConnect = true, | ||
| }) => _queryChannelsImpl( | ||
| filter: filter, | ||
| channelStateSort: channelStateSort, | ||
| predefinedFilter: predefinedFilter, | ||
| filterValues: filterValues, | ||
| sortValues: sortValues, | ||
| state: state, | ||
| watch: watch, | ||
| presence: presence, | ||
| memberLimit: memberLimit, | ||
| messageLimit: messageLimit, | ||
| paginationParams: paginationParams, | ||
| waitForConnect: waitForConnect, | ||
| ); | ||
|
|
||
| Stream<QueryChannelsResult> _queryChannelsImpl({ | ||
| Filter? filter, | ||
| SortOrder<ChannelState>? channelStateSort, | ||
| String? predefinedFilter, | ||
| Map<String, Object?>? filterValues, | ||
| Map<String, Object?>? sortValues, | ||
| bool state = true, | ||
| bool watch = true, | ||
| bool presence = false, | ||
|
|
@@ -665,6 +744,9 @@ class StreamChatClient { | |
| final hash = generateHash([ | ||
| filter, | ||
| channelStateSort, | ||
| predefinedFilter, | ||
| filterValues, | ||
| sortValues, | ||
| state, | ||
| watch, | ||
| presence, | ||
|
|
@@ -674,16 +756,19 @@ class StreamChatClient { | |
| ]); | ||
|
|
||
| // Per-caller offline emit — local persistence, not coalesced. | ||
| var offlineChannels = <Channel>[]; | ||
| QueryChannelsResult? offlineResult; | ||
| try { | ||
| offlineChannels = await queryChannelsOffline( | ||
| offlineResult = await _queryChannelsOfflineImpl( | ||
| filter: filter, | ||
| predefinedFilter: predefinedFilter, | ||
| filterValues: filterValues, | ||
| sortValues: sortValues, | ||
| channelStateSort: channelStateSort, | ||
| messageLimit: messageLimit, | ||
| paginationParams: paginationParams, | ||
| ); | ||
|
|
||
| if (offlineChannels.isNotEmpty) yield offlineChannels; | ||
| if (offlineResult.channels.isNotEmpty) yield offlineResult; | ||
| } catch (e, stk) { | ||
| logger.warning('Error querying channels offline', e, stk); | ||
| // Continue to online query even if offline fails | ||
|
|
@@ -696,9 +781,12 @@ class StreamChatClient { | |
| final result = await _queryChannelsCache.run( | ||
| hash, | ||
| () => | ||
| queryChannelsOnline( | ||
| _queryChannelsOnlineImpl( | ||
| filter: filter, | ||
| sort: channelStateSort, | ||
| predefinedFilter: predefinedFilter, | ||
| filterValues: filterValues, | ||
| sortValues: sortValues, | ||
| state: state, | ||
| watch: watch, | ||
| presence: presence, | ||
|
|
@@ -718,14 +806,48 @@ class StreamChatClient { | |
| } catch (e, stk) { | ||
| logger.severe('Error querying channels online', e, stk); | ||
| // Only rethrow if we have no channels to show the user | ||
| if (offlineChannels.isEmpty) rethrow; | ||
| if (offlineResult == null || offlineResult.channels.isEmpty) rethrow; | ||
| } | ||
| } | ||
|
|
||
| /// Requests channels with a given query from the API. | ||
| Future<List<Channel>> queryChannelsOnline({ | ||
| Filter? filter, | ||
| SortOrder<ChannelState>? sort, | ||
| String? predefinedFilter, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: At the moment I didn't expose public |
||
| Map<String, Object?>? filterValues, | ||
| Map<String, Object?>? sortValues, | ||
| bool state = true, | ||
| bool watch = true, | ||
| bool presence = false, | ||
| int? memberLimit, | ||
| int? messageLimit, | ||
| bool waitForConnect = true, | ||
| PaginationParams paginationParams = const PaginationParams(), | ||
| }) async { | ||
| final result = await _queryChannelsOnlineImpl( | ||
| filter: filter, | ||
| sort: sort, | ||
| predefinedFilter: predefinedFilter, | ||
| filterValues: filterValues, | ||
| sortValues: sortValues, | ||
| state: state, | ||
| watch: watch, | ||
| presence: presence, | ||
| memberLimit: memberLimit, | ||
| messageLimit: messageLimit, | ||
| waitForConnect: waitForConnect, | ||
| paginationParams: paginationParams, | ||
| ); | ||
| return result.channels; | ||
| } | ||
|
|
||
| Future<QueryChannelsResult> _queryChannelsOnlineImpl({ | ||
| Filter? filter, | ||
| SortOrder<ChannelState>? sort, | ||
| String? predefinedFilter, | ||
| Map<String, Object?>? filterValues, | ||
| Map<String, Object?>? sortValues, | ||
| bool state = true, | ||
| bool watch = true, | ||
| bool presence = false, | ||
|
|
@@ -756,6 +878,9 @@ class StreamChatClient { | |
| final res = await _chatApi.channel.queryChannels( | ||
| filter: filter, | ||
| sort: sort, | ||
| predefinedFilter: predefinedFilter, | ||
| filterValues: filterValues, | ||
| sortValues: sortValues, | ||
| state: state, | ||
| watch: watch, | ||
| presence: presence, | ||
|
|
@@ -771,7 +896,10 @@ class StreamChatClient { | |
| Please make sure to take a look at the Flutter tutorial: https://getstream.io/chat/flutter/tutorial | ||
| If your application already has users and channels, you might need to adjust your query channel as explained in the docs https://getstream.io/chat/docs/query_channels/?language=dart | ||
| '''); | ||
| return <Channel>[]; | ||
| return QueryChannelsResult( | ||
| channels: const [], | ||
| predefinedFilter: res.predefinedFilter, | ||
| ); | ||
| } | ||
|
|
||
| final channels = res.channels; | ||
|
|
@@ -786,40 +914,106 @@ class StreamChatClient { | |
| // Submit delivery report for the channels fetched in this query. | ||
| await channelDeliveryReporter.submitForDelivery(updateData.value); | ||
|
|
||
| await chatPersistenceClient?.updateChannelQueries( | ||
| filter, | ||
| channels.map((c) => c.channel!.cid).toList(), | ||
| // Clear the query cache if we are refreshing. | ||
| clearQueryCache: (paginationParams.offset ?? 0) == 0, | ||
| ); | ||
| final cachedCids = channels.map((c) => c.channel!.cid).toList(); | ||
| // Clear the query cache if we are refreshing. | ||
| final clearQueryCache = (paginationParams.offset ?? 0) == 0; | ||
|
|
||
| if (predefinedFilter == null) { | ||
| await chatPersistenceClient?.updateChannelQueries( | ||
| filter, | ||
| cachedCids, | ||
| clearQueryCache: clearQueryCache, | ||
| ); | ||
| } else { | ||
| // Note: predefinedFilter will never be null here | ||
| final resolvedFilter = res.predefinedFilter?.filter ?? const Filter.empty(); | ||
| final resolvedSort = res.predefinedFilter?.sort ?? resolvedFilter.predefinedFilterFallbackSort; | ||
|
|
||
| await chatPersistenceClient?.updateChannelQueriesByPredefinedFilter( | ||
| predefinedFilter, | ||
| cachedCids, | ||
| filter: resolvedFilter, | ||
| sort: resolvedSort, | ||
| filterValues: filterValues, | ||
| sortValues: sortValues, | ||
| clearQueryCache: clearQueryCache, | ||
| ); | ||
| } | ||
|
|
||
| this.state.addChannels(updateData.key); | ||
| return updateData.value; | ||
| return QueryChannelsResult( | ||
| channels: updateData.value, | ||
| predefinedFilter: res.predefinedFilter, | ||
| ); | ||
| } | ||
|
|
||
| /// Requests channels with a given query from the Persistence client. | ||
| Future<List<Channel>> queryChannelsOffline({ | ||
| Filter? filter, | ||
| String? predefinedFilter, | ||
| Map<String, Object?>? filterValues, | ||
| Map<String, Object?>? sortValues, | ||
| SortOrder<ChannelState>? channelStateSort, | ||
| int? messageLimit, | ||
| PaginationParams paginationParams = const PaginationParams(), | ||
| }) async { | ||
| final offlineChannels = await chatPersistenceClient?.getChannelStates( | ||
| final result = await _queryChannelsOfflineImpl( | ||
| filter: filter, | ||
| predefinedFilter: predefinedFilter, | ||
| filterValues: filterValues, | ||
| sortValues: sortValues, | ||
| channelStateSort: channelStateSort, | ||
| // Default limit is set to 25 in backend. | ||
| messageLimit: messageLimit ?? 25, | ||
| messageLimit: messageLimit, | ||
| paginationParams: paginationParams, | ||
| ); | ||
| return result.channels; | ||
| } | ||
|
|
||
| Future<QueryChannelsResult> _queryChannelsOfflineImpl({ | ||
| Filter? filter, | ||
| String? predefinedFilter, | ||
| Map<String, Object?>? filterValues, | ||
| Map<String, Object?>? sortValues, | ||
| SortOrder<ChannelState>? channelStateSort, | ||
| int? messageLimit, | ||
| PaginationParams paginationParams = const PaginationParams(), | ||
| }) async { | ||
| final QueryChannelsResponse res; | ||
| if (predefinedFilter == null) { | ||
| final channels = await chatPersistenceClient?.getChannelStates( | ||
| filter: filter, | ||
| channelStateSort: channelStateSort, | ||
| // Default limit is set to 25 in backend. | ||
| messageLimit: messageLimit ?? 25, | ||
| paginationParams: paginationParams, | ||
| ); | ||
| res = QueryChannelsResponse()..channels = channels ?? const []; | ||
| } else { | ||
| res = | ||
| await chatPersistenceClient?.getChannelStatesByPredefinedFilter( | ||
| filterName: predefinedFilter, | ||
| filterValues: filterValues, | ||
| sortValues: sortValues, | ||
| messageLimit: messageLimit ?? 25, | ||
| paginationParams: paginationParams, | ||
| ) ?? | ||
| (QueryChannelsResponse()..channels = const []); | ||
| } | ||
|
|
||
| if (offlineChannels == null || offlineChannels.isEmpty) { | ||
| if (res.channels.isEmpty) { | ||
| logger.info('No channels found in offline storage for the given query'); | ||
| return []; | ||
| return QueryChannelsResult( | ||
| channels: const [], | ||
| predefinedFilter: res.predefinedFilter, | ||
| ); | ||
| } | ||
|
|
||
| final updatedData = _mapChannelStateToChannel(offlineChannels); | ||
| state.addChannels(updatedData.key); | ||
| return updatedData.value; | ||
| final updateData = _mapChannelStateToChannel(res.channels); | ||
| state.addChannels(updateData.key); | ||
| return QueryChannelsResult( | ||
| channels: updateData.value, | ||
| predefinedFilter: res.predefinedFilter, | ||
| ); | ||
| } | ||
|
|
||
| MapEntry<Map<String, Channel>, List<Channel>> _mapChannelStateToChannel( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| import 'package:stream_chat/src/client/channel.dart'; | ||
| import 'package:stream_chat/src/core/models/predefined_filter.dart'; | ||
|
|
||
| /// The result of a `queryChannelsWithResult` call on [StreamChatClient]. | ||
| /// | ||
| /// Carries the live [Channel] instances matching the query alongside the | ||
| /// server-resolved [PredefinedFilter] spec (when one is associated with the | ||
| /// query). | ||
| class QueryChannelsResult { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I introduced this new model to serve as a container for the result of the |
||
| /// Creates a new [QueryChannelsResult]. | ||
| const QueryChannelsResult({ | ||
| required this.channels, | ||
| this.predefinedFilter, | ||
| }); | ||
|
|
||
| /// The live [Channel] instances matching the query. | ||
| final List<Channel> channels; | ||
|
|
||
| /// The server-resolved predefined-filter spec, or null when the query did | ||
| /// not use a `predefinedFilter`. | ||
| final PredefinedFilter? predefinedFilter; | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure about the naming here, I tried to make it as explicit as possible. (it wasn't really possible to use the existing
queryChannelswithout a breaking change, as we must change the return type).