Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions analysis_options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ analyzer:
avoid_catching_errors: ignore
document_ignores: ignore
one_member_abstracts: ignore
cascade_invocations: ignore
exclude:
- build/**
linter:
Expand Down
8 changes: 8 additions & 0 deletions lib/src/config/app_dependencies.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'package:flutter_news_app_api_server_full_source_code/src/config/environm
import 'package:flutter_news_app_api_server_full_source_code/src/rbac/permission_service.dart';
import 'package:flutter_news_app_api_server_full_source_code/src/services/auth_service.dart';
import 'package:flutter_news_app_api_server_full_source_code/src/services/auth_token_service.dart';
import 'package:flutter_news_app_api_server_full_source_code/src/services/country_query_service.dart';
import 'package:flutter_news_app_api_server_full_source_code/src/services/dashboard_summary_service.dart';
import 'package:flutter_news_app_api_server_full_source_code/src/services/database_seeding_service.dart';
import 'package:flutter_news_app_api_server_full_source_code/src/services/default_user_preference_limit_service.dart';
Expand Down Expand Up @@ -69,6 +70,7 @@ class AppDependencies {
late final PermissionService permissionService;
late final UserPreferenceLimitService userPreferenceLimitService;
late final RateLimitService rateLimitService;
late final CountryQueryService countryQueryService;

/// Initializes all application dependencies.
///
Expand Down Expand Up @@ -238,6 +240,11 @@ class AppDependencies {
connectionManager: _mongoDbConnectionManager,
log: Logger('MongoDbRateLimitService'),
);
countryQueryService = CountryQueryService(
countryRepository: countryRepository,
log: Logger('CountryQueryService'),
cacheDuration: const Duration(minutes: 15), // Default cache duration
);

_isInitialized = true;
_log.info('Application dependencies initialized successfully.');
Expand All @@ -255,6 +262,7 @@ class AppDependencies {
await _mongoDbConnectionManager.close();
tokenBlacklistService.dispose();
rateLimitService.dispose();
countryQueryService.dispose(); // Dispose the new service
_isInitialized = false;
_log.info('Application dependencies disposed.');
}
Expand Down
28 changes: 22 additions & 6 deletions lib/src/registry/data_operation_registry.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'package:core/core.dart';
import 'package:dart_frog/dart_frog.dart';
import 'package:data_repository/data_repository.dart';
import 'package:flutter_news_app_api_server_full_source_code/src/config/app_dependencies.dart';
import 'package:flutter_news_app_api_server_full_source_code/src/middlewares/ownership_check_middleware.dart';
import 'package:flutter_news_app_api_server_full_source_code/src/services/dashboard_summary_service.dart';

Expand Down Expand Up @@ -128,12 +129,27 @@ class DataOperationRegistry {
sort: s,
pagination: p,
),
'country': (c, uid, f, s, p) => c.read<DataRepository<Country>>().readAll(
userId: uid,
filter: f,
sort: s,
pagination: p,
),
'country': (c, uid, f, s, p) async {
final countryQueryService =
AppDependencies.instance.countryQueryService;
// Check for special filters that require aggregation
if (f != null &&
(f.containsKey('hasActiveSources') ||
f.containsKey('hasActiveHeadlines'))) {
return countryQueryService.getFilteredCountries(
filter: f,
pagination: p,
sort: s,
);
}
// Fallback to standard readAll if no special filters are present
return c.read<DataRepository<Country>>().readAll(
userId: uid,
filter: f,
sort: s,
pagination: p,
);
},
'language': (c, uid, f, s, p) => c
.read<DataRepository<Language>>()
.readAll(userId: uid, filter: f, sort: s, pagination: p),
Expand Down
306 changes: 306 additions & 0 deletions lib/src/services/country_query_service.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
import 'dart:async';
import 'dart:collection'; // Added for SplayTreeMap
import 'dart:convert';

import 'package:core/core.dart';
import 'package:data_repository/data_repository.dart';
import 'package:logging/logging.dart';

/// {@template country_query_service}
/// A service responsible for executing complex queries on country data,
/// including filtering by active sources and headlines, and supporting
/// compound filters with text search.
///
/// This service also implements robust in-memory caching with a configurable
/// Time-To-Live (TTL) to optimize performance for frequently requested queries.
/// {@endtemplate}
class CountryQueryService {
/// {@macro country_query_service}
CountryQueryService({
required DataRepository<Country> countryRepository,
required Logger log,
Duration cacheDuration = const Duration(minutes: 15),
}) : _countryRepository = countryRepository,
_log = log,
_cacheDuration = cacheDuration {
_cleanupTimer = Timer.periodic(const Duration(minutes: 5), (_) {
_cleanupCache();
});
_log.info(
'CountryQueryService initialized with cache duration: $cacheDuration',
);
}
final DataRepository<Country> _countryRepository;
final Logger _log;
final Duration _cacheDuration;

final Map<String, ({PaginatedResponse<Country> data, DateTime expiry})>
_cache = {};
Timer? _cleanupTimer;
bool _isDisposed = false;

/// Retrieves a paginated list of countries based on the provided filters,
/// including special filters for active sources and headlines, and text search.
///
/// This method supports compound filtering by combining `q` (text search),
/// `hasActiveSources`, `hasActiveHeadlines`, and other standard filters.
/// Results are cached to improve performance.
///
/// - [filter]: A map containing query conditions. Special keys like
/// `hasActiveSources` and `hasActiveHeadlines` trigger aggregation logic.
/// The `q` key triggers a text search on country names.
/// - [pagination]: Optional pagination parameters.
/// - [sort]: Optional sorting options.
///
/// Throws [OperationFailedException] for unexpected errors during query
/// execution or cache operations.
Future<PaginatedResponse<Country>> getFilteredCountries({
required Map<String, dynamic> filter,
PaginationOptions? pagination,
List<SortOption>? sort,
}) async {
if (_isDisposed) {
_log.warning('Attempted to query on disposed service.');
throw const OperationFailedException('Service is disposed.');
}

final cacheKey = _generateCacheKey(filter, pagination, sort);
final cachedEntry = _cache[cacheKey];

if (cachedEntry != null && DateTime.now().isBefore(cachedEntry.expiry)) {
_log.finer('Returning cached result for key: $cacheKey');
return cachedEntry.data;
}

_log.info('Executing new query for countries with filter: $filter');
try {
final pipeline = _buildAggregationPipeline(filter, pagination, sort);
final aggregationResult = await _countryRepository.aggregate(
pipeline: pipeline,
);

// MongoDB aggregation returns a list of maps. We need to convert these
// back into Country objects.
final countries = aggregationResult.map(Country.fromJson).toList();

// For aggregation queries, pagination and hasMore need to be handled
// manually if not directly supported by the aggregation stages.
// For simplicity, we'll assume the aggregation pipeline handles limit/skip
// and we'll determine hasMore based on if we fetched more than the limit.
final limit = pagination?.limit ?? countries.length;
final hasMore = countries.length > limit;
final paginatedCountries = countries.take(limit).toList();

final response = PaginatedResponse<Country>(
items: paginatedCountries,
cursor: null, // Aggregation doesn't typically return a cursor directly
hasMore: hasMore,
);

_cache[cacheKey] = (
data: response,
expiry: DateTime.now().add(_cacheDuration),
);
_log.finer('Cached new result for key: $cacheKey');

return response;
} on HttpException {
rethrow; // Propagate known HTTP exceptions
} catch (e, s) {
_log.severe('Error fetching filtered countries: $e', e, s);
throw OperationFailedException(
'Failed to retrieve filtered countries: $e',
);
}
}

/// Builds the MongoDB aggregation pipeline based on the provided filters.
List<Map<String, dynamic>> _buildAggregationPipeline(
Map<String, dynamic> filter,
PaginationOptions? pagination,
List<SortOption>? sort,
) {
final pipeline = <Map<String, dynamic>>[];
final compoundMatchStages = <Map<String, dynamic>>[];

// --- Stage 1: Initial Match for active status (if applicable) ---
// All countries should be active by default for these queries
compoundMatchStages.add({'status': ContentStatus.active.name});

// --- Stage 2: Handle `hasActiveSources` filter ---
if (filter['hasActiveSources'] == true) {
pipeline.add({
r'$lookup': {
'from': 'sources',
'localField': '_id',
'foreignField': 'headquarters._id',
'as': 'matchingSources',
},
});
pipeline.add({
r'$match': {
'matchingSources': {
r'$ne': <dynamic>[],
}, // Ensure there's at least one source
'matchingSources.status': ContentStatus.active.name,
},
});
}

// --- Stage 3: Handle `hasActiveHeadlines` filter ---
if (filter['hasActiveHeadlines'] == true) {
pipeline.add({
r'$lookup': {
'from': 'headlines',
'localField': '_id',
'foreignField': 'eventCountry._id',
'as': 'matchingHeadlines',
},
});
pipeline.add({
r'$match': {
'matchingHeadlines': {
r'$ne': <dynamic>[],
}, // Ensure there's at least one headline
'matchingHeadlines.status': ContentStatus.active.name,
},
});
}

// --- Stage 4: Handle `q` (text search) filter ---
final qValue = filter['q'];
if (qValue is String && qValue.isNotEmpty) {
compoundMatchStages.add({
r'$text': {r'$search': qValue},
});
}

// --- Stage 5: Handle other standard filters ---
filter.forEach((key, value) {
if (key != 'q' &&
key != 'hasActiveSources' &&
key != 'hasActiveHeadlines') {
compoundMatchStages.add({key: value});
}
});

// Combine all compound match stages
if (compoundMatchStages.isNotEmpty) {
pipeline.add({
r'$match': {r'$and': compoundMatchStages},
});
}

// --- Stage 6: Project to original Country structure and ensure uniqueness ---
// After lookups and matches, we might have duplicate countries if they
// matched multiple sources/headlines. We need to group them back to unique countries.
pipeline.add({
r'$group': {
'_id': r'$_id', // Group by the original country ID
'doc': {r'$first': r'$$ROOT'}, // Take the first full document
},
});
pipeline.add({
r'$replaceRoot': {
'newRoot': r'$doc', // Replace root with the original document
},
});

// --- Stage 7: Sorting ---
if (sort != null && sort.isNotEmpty) {
final sortStage = <String, dynamic>{};
for (final option in sort) {
sortStage[option.field] = option.order == SortOrder.asc ? 1 : -1;
}
pipeline.add({r'$sort': sortStage});
}

// --- Stage 8: Pagination (Skip and Limit) ---
if (pagination?.cursor != null) {
// For cursor-based pagination, we'd typically need a more complex
// aggregation that sorts by the cursor field and then skips.
// For simplicity, this example assumes offset-based pagination or
// that the client handles cursor logic.
_log.warning(
'Cursor-based pagination is not fully implemented for aggregation '
'queries in CountryQueryService. Only limit/skip is supported.',
);
}
if (pagination?.limit != null) {
// Fetch one more than the limit to determine 'hasMore'
pipeline.add({r'$limit': pagination!.limit! + 1});
}

// Project to match the Country model's JSON structure if necessary
// (e.g., if _id was used, map it back to id)
pipeline.add({
r'$project': {
'_id': 0, // Exclude _id
'id': {r'$toString': r'$_id'}, // Map _id back to id
'isoCode': r'$isoCode',
'name': r'$name',
'flagUrl': r'$flagUrl',
'createdAt': r'$createdAt',
'updatedAt': r'$updatedAt',
'status': r'$status',
// Ensure other fields are projected if they were modified or needed
},
});

return pipeline;
}

/// Generates a unique cache key from the query parameters.
String _generateCacheKey(
Map<String, dynamic> filter,
PaginationOptions? pagination,
List<SortOption>? sort,
) {
final sortedFilter = SplayTreeMap<String, dynamic>.from(filter);
final List<SortOption>? sortedSort;
if (sort != null) {
sortedSort = List<SortOption>.from(sort)
..sort((a, b) => a.field.compareTo(b.field));
} else {
sortedSort = null;
}

final keyData = {
'filter': sortedFilter,
'pagination': {'cursor': pagination?.cursor, 'limit': pagination?.limit},
'sort': sortedSort?.map((s) => '${s.field}:${s.order.name}').toList(),
};
return json.encode(keyData);
}

/// Cleans up expired entries from the in-memory cache.
void _cleanupCache() {
if (_isDisposed) return;

final now = DateTime.now();
final expiredKeys = <String>[];

_cache.forEach((key, value) {
if (now.isAfter(value.expiry)) {
expiredKeys.add(key);
}
});

if (expiredKeys.isNotEmpty) {
expiredKeys.forEach(_cache.remove);
_log.info('Cleaned up ${expiredKeys.length} expired cache entries.');
} else {
_log.finer('Cache cleanup ran, no expired entries found.');
}
}

/// Disposes of resources, specifically the periodic cache cleanup timer.
void dispose() {
if (!_isDisposed) {
_isDisposed = true;
_cleanupTimer?.cancel();
_cache.clear();
_log.info('CountryQueryService disposed.');
}
}
}
Loading
Loading