-
Notifications
You must be signed in to change notification settings - Fork 0
Implement country ggregation service #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 5 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
bda2264
build(dependencies): add collection package
fulleni ae69233
feat(service): implement CountryQueryService for complex country data…
fulleni 55d2855
feat(services): add country query service
fulleni b659fb8
feat(country): implement special filtering for country data operation
fulleni 6a22180
style: format misc
fulleni 697cf26
perf(country): optimize database queries and caching
fulleni f3242e6
feat(config): add country service cache duration configuration
fulleni 507acf3
docs(env): add configuration for country service cache duration
fulleni 450f9a4
feat(config): make country service cache duration configurable
fulleni 0a978b1
refactor(country): inject CountryQueryService via provider
fulleni 39eea59
style: misc
fulleni 644ac68
refactor(country): optimize country query pipeline and cache management
fulleni 04d30c6
refactor(country): optimize aggregation pipeline in getActiveCountries
fulleni File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,306 @@ | ||
| import 'dart:async'; | ||
| import 'dart:collection'; // Added for SplayTreeMap | ||
| import 'dart:convert'; | ||
|
|
||
| import 'package:core/core.dart'; | ||
| import 'package:data_repository/data_repository.dart'; | ||
| import 'package:logging/logging.dart'; | ||
|
|
||
| /// {@template country_query_service} | ||
| /// A service responsible for executing complex queries on country data, | ||
| /// including filtering by active sources and headlines, and supporting | ||
| /// compound filters with text search. | ||
| /// | ||
| /// This service also implements robust in-memory caching with a configurable | ||
| /// Time-To-Live (TTL) to optimize performance for frequently requested queries. | ||
| /// {@endtemplate} | ||
| class CountryQueryService { | ||
| /// {@macro country_query_service} | ||
| CountryQueryService({ | ||
| required DataRepository<Country> countryRepository, | ||
| required Logger log, | ||
| Duration cacheDuration = const Duration(minutes: 15), | ||
| }) : _countryRepository = countryRepository, | ||
| _log = log, | ||
| _cacheDuration = cacheDuration { | ||
| _cleanupTimer = Timer.periodic(const Duration(minutes: 5), (_) { | ||
| _cleanupCache(); | ||
| }); | ||
| _log.info( | ||
| 'CountryQueryService initialized with cache duration: $cacheDuration', | ||
| ); | ||
| } | ||
| final DataRepository<Country> _countryRepository; | ||
| final Logger _log; | ||
| final Duration _cacheDuration; | ||
|
|
||
| final Map<String, ({PaginatedResponse<Country> data, DateTime expiry})> | ||
| _cache = {}; | ||
| Timer? _cleanupTimer; | ||
| bool _isDisposed = false; | ||
|
|
||
| /// Retrieves a paginated list of countries based on the provided filters, | ||
| /// including special filters for active sources and headlines, and text search. | ||
| /// | ||
| /// This method supports compound filtering by combining `q` (text search), | ||
| /// `hasActiveSources`, `hasActiveHeadlines`, and other standard filters. | ||
| /// Results are cached to improve performance. | ||
| /// | ||
| /// - [filter]: A map containing query conditions. Special keys like | ||
| /// `hasActiveSources` and `hasActiveHeadlines` trigger aggregation logic. | ||
| /// The `q` key triggers a text search on country names. | ||
| /// - [pagination]: Optional pagination parameters. | ||
| /// - [sort]: Optional sorting options. | ||
| /// | ||
| /// Throws [OperationFailedException] for unexpected errors during query | ||
| /// execution or cache operations. | ||
| Future<PaginatedResponse<Country>> getFilteredCountries({ | ||
| required Map<String, dynamic> filter, | ||
| PaginationOptions? pagination, | ||
| List<SortOption>? sort, | ||
| }) async { | ||
| if (_isDisposed) { | ||
| _log.warning('Attempted to query on disposed service.'); | ||
| throw const OperationFailedException('Service is disposed.'); | ||
| } | ||
|
|
||
| final cacheKey = _generateCacheKey(filter, pagination, sort); | ||
| final cachedEntry = _cache[cacheKey]; | ||
|
|
||
| if (cachedEntry != null && DateTime.now().isBefore(cachedEntry.expiry)) { | ||
| _log.finer('Returning cached result for key: $cacheKey'); | ||
| return cachedEntry.data; | ||
| } | ||
|
|
||
| _log.info('Executing new query for countries with filter: $filter'); | ||
| try { | ||
| final pipeline = _buildAggregationPipeline(filter, pagination, sort); | ||
| final aggregationResult = await _countryRepository.aggregate( | ||
| pipeline: pipeline, | ||
| ); | ||
|
|
||
| // MongoDB aggregation returns a list of maps. We need to convert these | ||
| // back into Country objects. | ||
| final countries = aggregationResult.map(Country.fromJson).toList(); | ||
|
|
||
| // For aggregation queries, pagination and hasMore need to be handled | ||
| // manually if not directly supported by the aggregation stages. | ||
| // For simplicity, we'll assume the aggregation pipeline handles limit/skip | ||
| // and we'll determine hasMore based on if we fetched more than the limit. | ||
| final limit = pagination?.limit ?? countries.length; | ||
| final hasMore = countries.length > limit; | ||
| final paginatedCountries = countries.take(limit).toList(); | ||
|
|
||
| final response = PaginatedResponse<Country>( | ||
| items: paginatedCountries, | ||
| cursor: null, // Aggregation doesn't typically return a cursor directly | ||
| hasMore: hasMore, | ||
| ); | ||
|
|
||
| _cache[cacheKey] = ( | ||
| data: response, | ||
| expiry: DateTime.now().add(_cacheDuration), | ||
| ); | ||
| _log.finer('Cached new result for key: $cacheKey'); | ||
|
|
||
| return response; | ||
| } on HttpException { | ||
| rethrow; // Propagate known HTTP exceptions | ||
| } catch (e, s) { | ||
| _log.severe('Error fetching filtered countries: $e', e, s); | ||
| throw OperationFailedException( | ||
| 'Failed to retrieve filtered countries: $e', | ||
| ); | ||
| } | ||
| } | ||
fulleni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /// Builds the MongoDB aggregation pipeline based on the provided filters. | ||
| List<Map<String, dynamic>> _buildAggregationPipeline( | ||
| Map<String, dynamic> filter, | ||
| PaginationOptions? pagination, | ||
| List<SortOption>? sort, | ||
| ) { | ||
| final pipeline = <Map<String, dynamic>>[]; | ||
| final compoundMatchStages = <Map<String, dynamic>>[]; | ||
|
|
||
| // --- Stage 1: Initial Match for active status (if applicable) --- | ||
| // All countries should be active by default for these queries | ||
| compoundMatchStages.add({'status': ContentStatus.active.name}); | ||
|
|
||
| // --- Stage 2: Handle `hasActiveSources` filter --- | ||
| if (filter['hasActiveSources'] == true) { | ||
| pipeline.add({ | ||
| r'$lookup': { | ||
| 'from': 'sources', | ||
fulleni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| 'localField': '_id', | ||
| 'foreignField': 'headquarters._id', | ||
| 'as': 'matchingSources', | ||
| }, | ||
| }); | ||
| pipeline.add({ | ||
| r'$match': { | ||
| 'matchingSources': { | ||
| r'$ne': <dynamic>[], | ||
| }, // Ensure there's at least one source | ||
| 'matchingSources.status': ContentStatus.active.name, | ||
| }, | ||
| }); | ||
fulleni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // --- Stage 3: Handle `hasActiveHeadlines` filter --- | ||
| if (filter['hasActiveHeadlines'] == true) { | ||
| pipeline.add({ | ||
| r'$lookup': { | ||
| 'from': 'headlines', | ||
| 'localField': '_id', | ||
| 'foreignField': 'eventCountry._id', | ||
| 'as': 'matchingHeadlines', | ||
| }, | ||
| }); | ||
| pipeline.add({ | ||
| r'$match': { | ||
| 'matchingHeadlines': { | ||
| r'$ne': <dynamic>[], | ||
| }, // Ensure there's at least one headline | ||
| 'matchingHeadlines.status': ContentStatus.active.name, | ||
| }, | ||
| }); | ||
| } | ||
|
|
||
| // --- Stage 4: Handle `q` (text search) filter --- | ||
| final qValue = filter['q']; | ||
| if (qValue is String && qValue.isNotEmpty) { | ||
| compoundMatchStages.add({ | ||
| r'$text': {r'$search': qValue}, | ||
| }); | ||
| } | ||
|
|
||
| // --- Stage 5: Handle other standard filters --- | ||
| filter.forEach((key, value) { | ||
| if (key != 'q' && | ||
| key != 'hasActiveSources' && | ||
| key != 'hasActiveHeadlines') { | ||
| compoundMatchStages.add({key: value}); | ||
| } | ||
| }); | ||
|
|
||
| // Combine all compound match stages | ||
| if (compoundMatchStages.isNotEmpty) { | ||
| pipeline.add({ | ||
| r'$match': {r'$and': compoundMatchStages}, | ||
| }); | ||
| } | ||
|
|
||
| // --- Stage 6: Project to original Country structure and ensure uniqueness --- | ||
| // After lookups and matches, we might have duplicate countries if they | ||
| // matched multiple sources/headlines. We need to group them back to unique countries. | ||
| pipeline.add({ | ||
| r'$group': { | ||
| '_id': r'$_id', // Group by the original country ID | ||
| 'doc': {r'$first': r'$$ROOT'}, // Take the first full document | ||
| }, | ||
| }); | ||
| pipeline.add({ | ||
| r'$replaceRoot': { | ||
| 'newRoot': r'$doc', // Replace root with the original document | ||
| }, | ||
| }); | ||
|
|
||
| // --- Stage 7: Sorting --- | ||
| if (sort != null && sort.isNotEmpty) { | ||
| final sortStage = <String, dynamic>{}; | ||
| for (final option in sort) { | ||
| sortStage[option.field] = option.order == SortOrder.asc ? 1 : -1; | ||
| } | ||
| pipeline.add({r'$sort': sortStage}); | ||
| } | ||
|
|
||
| // --- Stage 8: Pagination (Skip and Limit) --- | ||
| if (pagination?.cursor != null) { | ||
| // For cursor-based pagination, we'd typically need a more complex | ||
| // aggregation that sorts by the cursor field and then skips. | ||
| // For simplicity, this example assumes offset-based pagination or | ||
| // that the client handles cursor logic. | ||
| _log.warning( | ||
| 'Cursor-based pagination is not fully implemented for aggregation ' | ||
| 'queries in CountryQueryService. Only limit/skip is supported.', | ||
| ); | ||
| } | ||
| if (pagination?.limit != null) { | ||
| // Fetch one more than the limit to determine 'hasMore' | ||
| pipeline.add({r'$limit': pagination!.limit! + 1}); | ||
| } | ||
|
|
||
| // Project to match the Country model's JSON structure if necessary | ||
| // (e.g., if _id was used, map it back to id) | ||
| pipeline.add({ | ||
| r'$project': { | ||
| '_id': 0, // Exclude _id | ||
| 'id': {r'$toString': r'$_id'}, // Map _id back to id | ||
| 'isoCode': r'$isoCode', | ||
| 'name': r'$name', | ||
| 'flagUrl': r'$flagUrl', | ||
| 'createdAt': r'$createdAt', | ||
| 'updatedAt': r'$updatedAt', | ||
| 'status': r'$status', | ||
| // Ensure other fields are projected if they were modified or needed | ||
| }, | ||
| }); | ||
fulleni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return pipeline; | ||
| } | ||
|
|
||
| /// Generates a unique cache key from the query parameters. | ||
| String _generateCacheKey( | ||
| Map<String, dynamic> filter, | ||
| PaginationOptions? pagination, | ||
| List<SortOption>? sort, | ||
| ) { | ||
| final sortedFilter = SplayTreeMap<String, dynamic>.from(filter); | ||
| final List<SortOption>? sortedSort; | ||
| if (sort != null) { | ||
| sortedSort = List<SortOption>.from(sort) | ||
| ..sort((a, b) => a.field.compareTo(b.field)); | ||
| } else { | ||
| sortedSort = null; | ||
| } | ||
|
|
||
| final keyData = { | ||
| 'filter': sortedFilter, | ||
| 'pagination': {'cursor': pagination?.cursor, 'limit': pagination?.limit}, | ||
| 'sort': sortedSort?.map((s) => '${s.field}:${s.order.name}').toList(), | ||
| }; | ||
| return json.encode(keyData); | ||
| } | ||
|
|
||
| /// Cleans up expired entries from the in-memory cache. | ||
| void _cleanupCache() { | ||
| if (_isDisposed) return; | ||
|
|
||
| final now = DateTime.now(); | ||
| final expiredKeys = <String>[]; | ||
|
|
||
| _cache.forEach((key, value) { | ||
| if (now.isAfter(value.expiry)) { | ||
| expiredKeys.add(key); | ||
| } | ||
| }); | ||
|
|
||
| if (expiredKeys.isNotEmpty) { | ||
| expiredKeys.forEach(_cache.remove); | ||
| _log.info('Cleaned up ${expiredKeys.length} expired cache entries.'); | ||
| } else { | ||
| _log.finer('Cache cleanup ran, no expired entries found.'); | ||
| } | ||
| } | ||
fulleni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /// Disposes of resources, specifically the periodic cache cleanup timer. | ||
| void dispose() { | ||
| if (!_isDisposed) { | ||
| _isDisposed = true; | ||
| _cleanupTimer?.cancel(); | ||
| _cache.clear(); | ||
| _log.info('CountryQueryService disposed.'); | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.