Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
- Added `Message.updateWith(Message? other)` — merges a server-side update onto the local message while preserving locally-known `poll`, `sharedLocation`, `ownReactions`, and nested `quotedMessage` enrichment when the server omits them.
- Added `Channel.isOneToOne` — true when the channel is `isDistinct` and has exactly two members. For the looser count-only check, inline `channel.memberCount == 2`.
- Added `IterableMergeX.merge` (keyed-map merge on `Iterable<T>`, unsorted output) and `SortedListX.mergeSorted` (two-pointer merge on a sorted `List<T>`). Splits what `SortedListX.merge` did in 9.24.0 — see Changed below.
- Added support for predefined filters for `QueryChannels` on `StreamChatClient` (`StreamChatClient.queryChannels` and `StreamChatClient.queryChannelsWithResult`).
- Added support for predefined filters for `QueryChannels` on `ChatPersistenceClient` (`ChatPersistenceClient.getChannelStatesByPredefinedFilter` and `ChatPersistenceClient.updateChannelQueriesByPredefinedFilter`),

⚠️ Deprecated

Expand Down
238 changes: 216 additions & 22 deletions packages/stream_chat/lib/src/client/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import 'package:rxdart/rxdart.dart';
import 'package:stream_chat/src/client/channel.dart';
import 'package:stream_chat/src/client/channel_delivery_reporter.dart';
import 'package:stream_chat/src/client/event_resolvers.dart' as event_resolvers;
import 'package:stream_chat/src/client/query_channels_result.dart';
import 'package:stream_chat/src/client/retry_policy.dart';
import 'package:stream_chat/src/core/api/attachment_file_uploader.dart';
import 'package:stream_chat/src/core/api/requests.dart';
Expand Down Expand Up @@ -46,6 +47,7 @@ import 'package:stream_chat/src/core/util/extension.dart';
import 'package:stream_chat/src/core/util/immutable_collection_subjects.dart';
import 'package:stream_chat/src/core/util/in_flight_cache.dart';
import 'package:stream_chat/src/core/util/list_extensions.dart';
import 'package:stream_chat/src/core/util/predefined_filter_defaults.dart';
import 'package:stream_chat/src/core/util/utils.dart';
import 'package:stream_chat/src/db/chat_persistence_client.dart';
import 'package:stream_chat/src/event_type.dart';
Expand Down Expand Up @@ -643,12 +645,89 @@ class StreamChatClient {
});
}

final _queryChannelsCache = InFlightCache<String, List<Channel>>();
final _queryChannelsCache = InFlightCache<String, QueryChannelsResult>();

/// Requests channels with a given query.
///
/// Either an inline [filter]/[channelStateSort] pair or a [predefinedFilter]
/// identifier (optionally interpolated with [filterValues] and [sortValues])
/// can be supplied.
///
/// Use [queryChannelsWithResult] if you also need the server-resolved
/// [PredefinedFilter] spec.
Stream<List<Channel>> queryChannels({
Filter? filter,
SortOrder<ChannelState>? channelStateSort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
int? memberLimit,
int? messageLimit,
PaginationParams paginationParams = const PaginationParams(),
bool waitForConnect = true,
}) async* {
await for (final result in _queryChannelsImpl(
filter: filter,
channelStateSort: channelStateSort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
memberLimit: memberLimit,
messageLimit: messageLimit,
paginationParams: paginationParams,
waitForConnect: waitForConnect,
)) {
yield result.channels;
}
}

/// Requests channels with a given query, yielding a [QueryChannelsResult]
/// that carries both the live channel list and the server-resolved
/// [PredefinedFilter] spec (when one is associated with the query).
///
/// Yields the offline-cached result first (when available), followed by
/// the online result. Concurrent identical online queries are coalesced
/// via [_queryChannelsCache].
Stream<QueryChannelsResult> queryChannelsWithResult({
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure about the naming here, I tried to make it as explicit as possible. (it wasn't really possible to use the existing queryChannels without a breaking change, as we must change the return type).

Filter? filter,
SortOrder<ChannelState>? channelStateSort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
int? memberLimit,
int? messageLimit,
PaginationParams paginationParams = const PaginationParams(),
bool waitForConnect = true,
}) => _queryChannelsImpl(
filter: filter,
channelStateSort: channelStateSort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
memberLimit: memberLimit,
messageLimit: messageLimit,
paginationParams: paginationParams,
waitForConnect: waitForConnect,
);

Stream<QueryChannelsResult> _queryChannelsImpl({
Filter? filter,
SortOrder<ChannelState>? channelStateSort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
Expand All @@ -665,6 +744,9 @@ class StreamChatClient {
final hash = generateHash([
filter,
channelStateSort,
predefinedFilter,
filterValues,
sortValues,
state,
watch,
presence,
Expand All @@ -674,16 +756,19 @@ class StreamChatClient {
]);

// Per-caller offline emit — local persistence, not coalesced.
var offlineChannels = <Channel>[];
QueryChannelsResult? offlineResult;
try {
offlineChannels = await queryChannelsOffline(
offlineResult = await _queryChannelsOfflineImpl(
filter: filter,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
channelStateSort: channelStateSort,
messageLimit: messageLimit,
paginationParams: paginationParams,
);

if (offlineChannels.isNotEmpty) yield offlineChannels;
if (offlineResult.channels.isNotEmpty) yield offlineResult;
} catch (e, stk) {
logger.warning('Error querying channels offline', e, stk);
// Continue to online query even if offline fails
Expand All @@ -696,9 +781,12 @@ class StreamChatClient {
final result = await _queryChannelsCache.run(
hash,
() =>
queryChannelsOnline(
_queryChannelsOnlineImpl(
filter: filter,
sort: channelStateSort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
Expand All @@ -718,14 +806,48 @@ class StreamChatClient {
} catch (e, stk) {
logger.severe('Error querying channels online', e, stk);
// Only rethrow if we have no channels to show the user
if (offlineChannels.isEmpty) rethrow;
if (offlineResult == null || offlineResult.channels.isEmpty) rethrow;
}
}

/// Requests channels with a given query from the API.
Future<List<Channel>> queryChannelsOnline({
Filter? filter,
SortOrder<ChannelState>? sort,
String? predefinedFilter,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: At the moment I didn't expose public queryChannelsWithResultOnline/queryChannelsWithResultOffline -> only the new queryChannelsWithResults is public. I didn't want to pollute the public API, with methods which most likely will not be used. Let me know if you think we should expose them as well.

Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
int? memberLimit,
int? messageLimit,
bool waitForConnect = true,
PaginationParams paginationParams = const PaginationParams(),
}) async {
final result = await _queryChannelsOnlineImpl(
filter: filter,
sort: sort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
memberLimit: memberLimit,
messageLimit: messageLimit,
waitForConnect: waitForConnect,
paginationParams: paginationParams,
);
return result.channels;
}

Future<QueryChannelsResult> _queryChannelsOnlineImpl({
Filter? filter,
SortOrder<ChannelState>? sort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
Expand Down Expand Up @@ -756,6 +878,9 @@ class StreamChatClient {
final res = await _chatApi.channel.queryChannels(
filter: filter,
sort: sort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
Expand All @@ -771,7 +896,10 @@ class StreamChatClient {
Please make sure to take a look at the Flutter tutorial: https://getstream.io/chat/flutter/tutorial
If your application already has users and channels, you might need to adjust your query channel as explained in the docs https://getstream.io/chat/docs/query_channels/?language=dart
''');
return <Channel>[];
return QueryChannelsResult(
channels: const [],
predefinedFilter: res.predefinedFilter,
);
}

final channels = res.channels;
Expand All @@ -786,40 +914,106 @@ class StreamChatClient {
// Submit delivery report for the channels fetched in this query.
await channelDeliveryReporter.submitForDelivery(updateData.value);

await chatPersistenceClient?.updateChannelQueries(
filter,
channels.map((c) => c.channel!.cid).toList(),
// Clear the query cache if we are refreshing.
clearQueryCache: (paginationParams.offset ?? 0) == 0,
);
final cachedCids = channels.map((c) => c.channel!.cid).toList();
// Clear the query cache if we are refreshing.
final clearQueryCache = (paginationParams.offset ?? 0) == 0;

if (predefinedFilter == null) {
await chatPersistenceClient?.updateChannelQueries(
filter,
cachedCids,
clearQueryCache: clearQueryCache,
);
} else {
// Note: predefinedFilter will never be null here
final resolvedFilter = res.predefinedFilter?.filter ?? const Filter.empty();
final resolvedSort = res.predefinedFilter?.sort ?? resolvedFilter.predefinedFilterFallbackSort;

await chatPersistenceClient?.updateChannelQueriesByPredefinedFilter(
predefinedFilter,
cachedCids,
filter: resolvedFilter,
sort: resolvedSort,
filterValues: filterValues,
sortValues: sortValues,
clearQueryCache: clearQueryCache,
);
}

this.state.addChannels(updateData.key);
return updateData.value;
return QueryChannelsResult(
channels: updateData.value,
predefinedFilter: res.predefinedFilter,
);
}

/// Requests channels with a given query from the Persistence client.
Future<List<Channel>> queryChannelsOffline({
Filter? filter,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
SortOrder<ChannelState>? channelStateSort,
int? messageLimit,
PaginationParams paginationParams = const PaginationParams(),
}) async {
final offlineChannels = await chatPersistenceClient?.getChannelStates(
final result = await _queryChannelsOfflineImpl(
filter: filter,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
channelStateSort: channelStateSort,
// Default limit is set to 25 in backend.
messageLimit: messageLimit ?? 25,
messageLimit: messageLimit,
paginationParams: paginationParams,
);
return result.channels;
}

Future<QueryChannelsResult> _queryChannelsOfflineImpl({
Filter? filter,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
SortOrder<ChannelState>? channelStateSort,
int? messageLimit,
PaginationParams paginationParams = const PaginationParams(),
}) async {
final QueryChannelsResponse res;
if (predefinedFilter == null) {
final channels = await chatPersistenceClient?.getChannelStates(
filter: filter,
channelStateSort: channelStateSort,
// Default limit is set to 25 in backend.
messageLimit: messageLimit ?? 25,
paginationParams: paginationParams,
);
res = QueryChannelsResponse()..channels = channels ?? const [];
} else {
res =
await chatPersistenceClient?.getChannelStatesByPredefinedFilter(
filterName: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
messageLimit: messageLimit ?? 25,
paginationParams: paginationParams,
) ??
(QueryChannelsResponse()..channels = const []);
}

if (offlineChannels == null || offlineChannels.isEmpty) {
if (res.channels.isEmpty) {
logger.info('No channels found in offline storage for the given query');
return [];
return QueryChannelsResult(
channels: const [],
predefinedFilter: res.predefinedFilter,
);
}

final updatedData = _mapChannelStateToChannel(offlineChannels);
state.addChannels(updatedData.key);
return updatedData.value;
final updateData = _mapChannelStateToChannel(res.channels);
state.addChannels(updateData.key);
return QueryChannelsResult(
channels: updateData.value,
predefinedFilter: res.predefinedFilter,
);
}

MapEntry<Map<String, Channel>, List<Channel>> _mapChannelStateToChannel(
Expand Down
22 changes: 22 additions & 0 deletions packages/stream_chat/lib/src/client/query_channels_result.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import 'package:stream_chat/src/client/channel.dart';
import 'package:stream_chat/src/core/models/predefined_filter.dart';

/// The result of a `queryChannelsWithResult` call on [StreamChatClient].
///
/// Carries the live [Channel] instances matching the query alongside the
/// server-resolved [PredefinedFilter] spec (when one is associated with the
/// query).
class QueryChannelsResult {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced this new model to serve as a container for the result of the StreamChatClient.queryChannelsWithResult.

/// Creates a new [QueryChannelsResult].
const QueryChannelsResult({
required this.channels,
this.predefinedFilter,
});

/// The live [Channel] instances matching the query.
final List<Channel> channels;

/// The server-resolved predefined-filter spec, or null when the query did
/// not use a `predefinedFilter`.
final PredefinedFilter? predefinedFilter;
}
Loading
Loading