|
9 | 9 | import codecs |
10 | 10 | from collections import defaultdict |
11 | 11 | from collections.abc import ( |
| 12 | + Callable, |
12 | 13 | Hashable, |
13 | 14 | Mapping, |
14 | 15 | Sequence, |
15 | 16 | ) |
16 | 17 | import dataclasses |
17 | 18 | import functools |
18 | 19 | import gzip |
| 20 | +from importlib.metadata import entry_points |
19 | 21 | from io import ( |
20 | 22 | BufferedIOBase, |
21 | 23 | BytesIO, |
|
90 | 92 |
|
91 | 93 | from pandas import MultiIndex |
92 | 94 |
|
| 95 | +# registry of I/O engines. It is populated the first time a non-core |
| 96 | +# pandas engine is used |
| 97 | +_io_engines: dict[str, Any] | None = None |
| 98 | + |
93 | 99 |
|
94 | 100 | @dataclasses.dataclass |
95 | 101 | class IOArgs: |
@@ -1282,3 +1288,149 @@ def dedup_names( |
1282 | 1288 | counts[col] = cur_count + 1 |
1283 | 1289 |
|
1284 | 1290 | return names |
| 1291 | + |
| 1292 | + |
| 1293 | +def _get_io_engine(name: str) -> Any: |
| 1294 | + """ |
| 1295 | + Return an I/O engine by its name. |
| 1296 | +
|
| 1297 | + pandas I/O engines can be registered via entry points. The first time this |
| 1298 | + function is called it will register all the entry points of the "pandas.io_engine" |
| 1299 | + group and cache them in the global `_io_engines` variable. |
| 1300 | +
|
| 1301 | + Engines are implemented as classes with the `read_<format>` and `to_<format>` |
| 1302 | + methods (classmethods) for the formats they wish to provide. This function will |
| 1303 | + return the method from the engine and format being requested. |
| 1304 | +
|
| 1305 | + Parameters |
| 1306 | + ---------- |
| 1307 | + name : str |
| 1308 | + The engine name provided by the user in `engine=<value>`. |
| 1309 | +
|
| 1310 | + Examples |
| 1311 | + -------- |
| 1312 | + An engine is implemented with a class like: |
| 1313 | +
|
| 1314 | + >>> class DummyEngine: |
| 1315 | + ... @classmethod |
| 1316 | + ... def read_csv(cls, filepath_or_buffer, **kwargs): |
| 1317 | + ... # the engine signature must match the pandas method signature |
| 1318 | + ... return pd.DataFrame() |
| 1319 | +
|
| 1320 | + It must be registered as an entry point with the engine name: |
| 1321 | +
|
| 1322 | + ``` |
| 1323 | + [project.entry-points."pandas.io_engine"] |
| 1324 | + dummy = "pandas:io.dummy.DummyEngine" |
| 1325 | +
|
| 1326 | + ``` |
| 1327 | +
|
| 1328 | + Then the `read_csv` method of the engine can be used with: |
| 1329 | +
|
| 1330 | + >>> _get_io_engine(engine_name="dummy").read_csv("myfile.csv") # doctest: +SKIP |
| 1331 | +
|
| 1332 | + This is used internally to dispatch the next pandas call to the engine caller: |
| 1333 | +
|
| 1334 | + >>> df = read_csv("myfile.csv", engine="dummy") # doctest: +SKIP |
| 1335 | + """ |
| 1336 | + global _io_engines |
| 1337 | + |
| 1338 | + if _io_engines is None: |
| 1339 | + _io_engines = {} |
| 1340 | + for entry_point in entry_points().select(group="pandas.io_engine"): |
| 1341 | + if entry_point.dist: |
| 1342 | + package_name = entry_point.dist.metadata["Name"] |
| 1343 | + else: |
| 1344 | + package_name = None |
| 1345 | + if entry_point.name in _io_engines: |
| 1346 | + _io_engines[entry_point.name]._packages.append(package_name) |
| 1347 | + else: |
| 1348 | + _io_engines[entry_point.name] = entry_point.load() |
| 1349 | + _io_engines[entry_point.name]._packages = [package_name] |
| 1350 | + |
| 1351 | + try: |
| 1352 | + engine = _io_engines[name] |
| 1353 | + except KeyError as err: |
| 1354 | + raise ValueError( |
| 1355 | + f"'{name}' is not a known engine. Some engines are only available " |
| 1356 | + "after installing the package that provides them." |
| 1357 | + ) from err |
| 1358 | + |
| 1359 | + if len(engine._packages) > 1: |
| 1360 | + msg = ( |
| 1361 | + f"The engine '{name}' has been registered by the package " |
| 1362 | + f"'{engine._packages[0]}' and will be used. " |
| 1363 | + ) |
| 1364 | + if len(engine._packages) == 2: |
| 1365 | + msg += ( |
| 1366 | + f"The package '{engine._packages[1]}' also tried to register " |
| 1367 | + "the engine, but it couldn't because it was already registered." |
| 1368 | + ) |
| 1369 | + else: |
| 1370 | + msg += ( |
| 1371 | + "The packages {str(engine._packages[1:]}[1:-1] also tried to register " |
| 1372 | + "the engine, but they couldn't because it was already registered." |
| 1373 | + ) |
| 1374 | + warnings.warn(msg, RuntimeWarning, stacklevel=find_stack_level()) |
| 1375 | + |
| 1376 | + return engine |
| 1377 | + |
| 1378 | + |
| 1379 | +def allow_third_party_engines( |
| 1380 | + skip_engines: list[str] | Callable | None = None, |
| 1381 | +) -> Callable: |
| 1382 | + """ |
| 1383 | + Decorator to avoid boilerplate code when allowing readers and writers to use |
| 1384 | + third-party engines. |
| 1385 | +
|
| 1386 | + The decorator will introspect the function to know which format should be obtained, |
| 1387 | + and to know if it's a reader or a writer. Then it will check if the engine has been |
| 1388 | + registered, and if it has, it will dispatch the execution to the engine with the |
| 1389 | + arguments provided by the user. |
| 1390 | +
|
| 1391 | + Parameters |
| 1392 | + ---------- |
| 1393 | + skip_engines : list of str, optional |
| 1394 | + For engines that are implemented in pandas, we want to skip them for this engine |
| 1395 | + dispatching system. They should be specified in this parameter. |
| 1396 | +
|
| 1397 | + Examples |
| 1398 | + -------- |
| 1399 | + The decorator works both with the `skip_engines` parameter, or without: |
| 1400 | +
|
| 1401 | + >>> class DataFrame: |
| 1402 | + ... @allow_third_party_engines(["python", "c", "pyarrow"]) |
| 1403 | + ... def read_csv(filepath_or_buffer, **kwargs): |
| 1404 | + ... pass |
| 1405 | + ... |
| 1406 | + ... @allow_third_party_engines |
| 1407 | + ... def read_sas(filepath_or_buffer, **kwargs): |
| 1408 | + ... pass |
| 1409 | + """ |
| 1410 | + |
| 1411 | + def decorator(func: Callable) -> Callable: |
| 1412 | + @functools.wraps(func) |
| 1413 | + def wrapper(*args: Any, **kwargs: Any) -> Any: |
| 1414 | + if callable(skip_engines) or skip_engines is None: |
| 1415 | + skip_engine = False |
| 1416 | + else: |
| 1417 | + skip_engine = kwargs["engine"] in skip_engines |
| 1418 | + |
| 1419 | + if "engine" in kwargs and not skip_engine: |
| 1420 | + engine_name = kwargs.pop("engine") |
| 1421 | + engine = _get_io_engine(engine_name) |
| 1422 | + try: |
| 1423 | + return getattr(engine, func.__name__)(*args, **kwargs) |
| 1424 | + except AttributeError as err: |
| 1425 | + raise ValueError( |
| 1426 | + f"The engine '{engine_name}' does not provide a " |
| 1427 | + f"'{func.__name__}' function" |
| 1428 | + ) from err |
| 1429 | + else: |
| 1430 | + return func(*args, **kwargs) |
| 1431 | + |
| 1432 | + return wrapper |
| 1433 | + |
| 1434 | + if callable(skip_engines): |
| 1435 | + return decorator(skip_engines) |
| 1436 | + return decorator |
0 commit comments