|
2 | 2 |
|
3 | 3 | import os |
4 | 4 | import warnings |
5 | | -from functools import partial |
6 | 5 | from logging import getLogger |
7 | 6 | from tempfile import TemporaryDirectory |
8 | | -from typing import TYPE_CHECKING, Any, Callable, Iterable, Literal, Sequence |
| 7 | +from typing import TYPE_CHECKING, Any, Literal, Sequence |
9 | 8 |
|
10 | 9 | from snowflake.connector import ProgrammingError |
11 | 10 | from snowflake.connector.options import pandas |
@@ -572,99 +571,3 @@ async def drop_object(name: str, object_type: str) -> None: |
572 | 571 | sum(int(e[3]) for e in copy_results), |
573 | 572 | copy_results, |
574 | 573 | ) |
575 | | - |
576 | | - |
577 | | -def make_pd_writer( |
578 | | - **kwargs, |
579 | | -) -> Callable[ |
580 | | - [ |
581 | | - pandas.io.sql.SQLTable, |
582 | | - sqlalchemy.engine.Engine | sqlalchemy.engine.Connection, |
583 | | - Iterable, |
584 | | - Iterable, |
585 | | - Any, |
586 | | - ], |
587 | | - None, |
588 | | -]: |
589 | | - """This returns a pd_writer with the desired arguments. |
590 | | -
|
591 | | - Example usage: |
592 | | - import pandas as pd |
593 | | - from snowflake.connector.aio.pandas_tools import make_pd_writer |
594 | | -
|
595 | | - sf_connector_version_df = pd.DataFrame([('snowflake-connector-python', '1.0')], columns=['NAME', 'NEWEST_VERSION']) |
596 | | - # Note: SQLAlchemy async support would be needed for this to work with async connections |
597 | | - sf_connector_version_df.to_sql('driver_versions', engine, index=False, method=make_pd_writer()) |
598 | | -
|
599 | | - # to use parallel=1, quote_identifiers=False, |
600 | | - from functools import partial |
601 | | - sf_connector_version_df.to_sql( |
602 | | - 'driver_versions', engine, index=False, method=make_pd_writer(parallel=1, quote_identifiers=False))) |
603 | | -
|
604 | | - This function takes arguments used by 'pd_writer' (excluding 'table', 'conn', 'keys', and 'data_iter') |
605 | | - Please refer to 'pd_writer' for documentation. |
606 | | - """ |
607 | | - if any(arg in kwargs for arg in ("table", "conn", "keys", "data_iter")): |
608 | | - raise ProgrammingError( |
609 | | - "Arguments 'table', 'conn', 'keys', and 'data_iter' are not supported parameters for make_pd_writer." |
610 | | - ) |
611 | | - |
612 | | - return partial(pd_writer, **kwargs) |
613 | | - |
614 | | - |
615 | | -async def pd_writer( |
616 | | - table: pandas.io.sql.SQLTable, |
617 | | - conn: sqlalchemy.engine.Engine | sqlalchemy.engine.Connection, |
618 | | - keys: Iterable, |
619 | | - data_iter: Iterable, |
620 | | - **kwargs, |
621 | | -) -> None: |
622 | | - """This is a wrapper on top of write_pandas to make it compatible with to_sql method in pandas. |
623 | | -
|
624 | | - Notes: |
625 | | - Please note that when column names in the pandas DataFrame are consist of strictly lower case letters, column names need to |
626 | | - be enquoted, otherwise `ProgrammingError` will be raised. |
627 | | -
|
628 | | - This is because `snowflake-sqlalchemy` does not enquote lower case column names when creating the table, but `pd_writer` enquotes the columns by default. |
629 | | - the copy into command looks for enquoted column names. |
630 | | -
|
631 | | - Future improvements will be made in the snowflake-sqlalchemy library. |
632 | | -
|
633 | | - Note: This async version requires async SQLAlchemy support. |
634 | | -
|
635 | | - Example usage: |
636 | | - import pandas as pd |
637 | | - from snowflake.connector.aio.pandas_tools import pd_writer |
638 | | -
|
639 | | - sf_connector_version_df = pd.DataFrame([('snowflake-connector-python', '1.0')], columns=['NAME', 'NEWEST_VERSION']) |
640 | | - # Note: Requires async SQLAlchemy engine |
641 | | - await sf_connector_version_df.to_sql('driver_versions', engine, index=False, method=pd_writer) |
642 | | -
|
643 | | - # when the column names are consist of only lower case letters, enquote the column names |
644 | | - sf_connector_version_df = pd.DataFrame([('snowflake-connector-python', '1.0')], columns=['"name"', '"newest_version"']) |
645 | | - await sf_connector_version_df.to_sql('driver_versions', engine, index=False, method=pd_writer) |
646 | | -
|
647 | | - Args: |
648 | | - table: Pandas package's table object. |
649 | | - conn: SQLAlchemy engine object to talk to Snowflake. |
650 | | - keys: Column names that we are trying to insert. |
651 | | - data_iter: Iterator over the rows. |
652 | | -
|
653 | | - More parameters can be provided to be used by 'write_pandas' (excluding 'conn', 'df', 'table_name', and 'schema'), |
654 | | - Please refer to 'write_pandas' for documentation on other available parameters. |
655 | | - """ |
656 | | - if any(arg in kwargs for arg in ("conn", "df", "table_name", "schema")): |
657 | | - raise ProgrammingError( |
658 | | - "Arguments 'conn', 'df', 'table_name', and 'schema' are not supported parameters for pd_writer." |
659 | | - ) |
660 | | - |
661 | | - sf_connection = conn.connection.connection |
662 | | - df = pandas.DataFrame(data_iter, columns=keys) |
663 | | - await write_pandas( |
664 | | - conn=sf_connection, |
665 | | - df=df, |
666 | | - # Note: Our sqlalchemy connector creates tables case insensitively |
667 | | - table_name=table.name.upper(), |
668 | | - schema=table.schema, |
669 | | - **kwargs, |
670 | | - ) |
0 commit comments