Skip to content

Conversation

@naoking158
Copy link

お忙しい中恐縮ですが、ご確認のほどよろしくお願いいたします。

実行コード
```sh
$ ruff check --fix; ruff format
```
このグラフには日ごとの
- 消費量の総量
- 中央値と 10-90%-ile
がプロットされる。
このグラフにはエリア別に日ごとの
- 消費量の総量
- 中央値と 10-90%-ile
がプロットされる。
このグラフには特定ユーザーの日ごとの消費量の総量と、
そのユーザーが属するエリアの日ごとの消費量の中央値がプロットされる。
Copy link

@kmamoru kmamoru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

チャレンジの提出ありがとうございます!
全体的にコードも読みやすく、関数も大きすぎずまとまっていたと思います
いくつかコメントや質問を残していますが、お手隙の時にご確認ください

Comment on lines +26 to +30
# bulk_update では Model.save() が呼ばれないため、updated_at が更新されない
# この対処として、objects を差し替える
# ref: https://scrapbox.io/shimizukawa/django_bulk_update_%E6%99%82%E3%81%ABupdated_at%E3%82%92%E6%9B%B4%E6%96%B0%E3%81%99%E3%82%8B
objects = models.manager.BaseManager.from_queryset(QuerySet)()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bulk_updateのupdated_at更新の工夫、なるほどと思いました 👍


def handle(self, *args, **options):
print("Implement me!")
data_dir = Path(settings.BASE_DIR).parent / 'data'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pathlib使ってるの素晴らしいと思います!

return users_to_create, users_to_update


def import_user_data(csv_file_path, batch_size=10000):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

【質問】
データを取り込んだ際にcsvの内容が正しくない場合エラーを吐くことがあると思います
その場合の調査を簡単にする、またはエラー時の対応を楽にする方法で何か工夫できる点はあるでしょうか?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

想定され得るエラーに対して try-except を用いてエラーハンドリングすべきだと思います。

私の知る限り想定されるエラーは以下2種類です。

  • 文字コード由来のエンコードエラー UnicodeDecodeError
  • ファイル暗号化由来のエンコードエラー UnicodeDecodeError

上記エラーへの対処と、それ以外のエラーをキャッチしてログに残す関数を下記に示します。
pd.read_csv をこの関数で置き換えることで調査・対応が楽になると思います。

def load_csv(path: Path) -> pd.DataFrame:
    if not path.exists():
        raise FileNotFoundError(f'csv not found: {path}')

    encodings = ["utf-8-sig", "cp932", "shift_jis", "euc-jp", "iso-2022-jp"]
    for encoding in encodings:
        try:
            df = pd.read_csv(path, encoding=encoding, engine="python")
            return df
        except UnicodeDecodeError:
            # 現在の文字コード `encoding` で開けなかった場合、次の encoding を試す 
            continue
        except Exception as e:
            # Exception の場合に付加情報が不要であれば、このブロックは不要
            raise Exception(e)

    # ファイルが暗号化されている場合も `UnicodeDecodeError` が発生するが、前述の処理では素通りしてしまう。
    # encodings で列挙した文字コードで開けなかった場合はファイル暗号化の可能性が残るためここで raise する。
    raise UnicodeDecodeError(f'cs may be encypted: {path}')

補足: utf-8 ではなく utf-8-sig を使用する理由

utf-8-sig は BOM 付き UTF-8 に対応した文字コード。

BOM 付き UTF-8 のファイルを文字コード utf-8 で読み込むこともできる。
しかし、テキスト先頭に BOM が残ってしまうため、例えば CSV の列名を指定して処理する際に文字列が一致しなくてバグが発生する可能性がある。

一方、BOM の無い UTF-8 のファイルを文字コード utf-8-sig で読み込んだ場合、
先頭に BOM が無いためテキストはそのまま読み込まれる。
(参考: encodings.utf_8_sig — UTF-8 codec with BOM signature)

したがって、UTF-8 を想定した文字コードには utf-8-sig を指定しておけば安全だと言える。

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

早速の回答ありがとうございます!
ログを出したり、ファイルそのものの処理を飛ばすなどの工夫は有用だと思います!

from pathlib import Path
from typing import Any, Iterable

import pandas as pd
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

csv操作でpandsを使っているの素晴らしいと思いました!

)

with transaction.atomic():
for i in range(0, len(consumption_data_to_create), batch_size):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

range(0, -9940, 10000)

python manage.py import実行時にユーザーが取り込まれませんでした...
ユーザー数60件のcsvだとこのfor文内は実行されなそうです

【質問】
どのように修正できそうでしょうか?

Copy link
Author

@naoking158 naoking158 Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

すみません、バッチ処理をしている箇所は全て正しくインデックスを扱えていませんでした。

下記コミットで修正対応しました。
307fad6

以下、一部抜粋ですが、len(users_to_create) - batch_size とするのではなく、
len(users_to_create) // batch_size とすることで、データ数が batch_size より小さい場合にも対応できるようにしました。

-    with transaction.atomic():
-        for i in range(0, len(users_to_create) - batch_size, batch_size):
-            if len(users_to_create) - i >= batch_size:
-                User.objects.bulk_create(users_to_create[i : i + batch_size])
-            else:
-                User.objects.bulk_create(users_to_create[i:])
 
-        for i in range(0, len(users_to_create) - batch_size, batch_size):
-            if len(users_to_update) - i >= batch_size:
-                User.objects.bulk_update(users_to_update[i : i + batch_size], ['area', 'tariff'])
-            else:
-                User.objects.bulk_update(users_to_update[i:], ['area', 'tariff'])
+    with transaction.atomic():
+        for i in range(len(users_to_create) // batch_size + 1):
+            # IndexError が発生しないように処理をスキップ
+            if i * batch_size == len(users_to_create):
+                continue
+            User.objects.bulk_create(users_to_create[i * batch_size : (i + 1) * batch_size])
+
+        for i in range(len(users_to_update) // batch_size + 1):
+            # IndexError が発生しないように処理をスキップ
+            if i * batch_size == len(users_to_update):
+                continue
+            User.objects.bulk_update(
+                users_to_update[i * batch_size : (i + 1) * batch_size], ['area', 'tariff']

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

修正ありがとうございます!
rangeの扱いって癖ありますよね...
修正いただいた対応でも問題ないと思いましたが同様の処理は以下でも実装できるかと思います!
(Djangoのbulk_create, bulk_updateのソースコード参照です!)

User.objects.bulk_create(users_to_create, batch_size=batch_size)
User.objects.bulk_update(users_to_update, fields=['area', 'tariff'], batch_size=batch_size)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API reference では第二引数に記載されているのになぜか見落としていました、お恥ずかしい限りです…

Comment on lines +152 to +158
consumption_data_to_create, consumption_data_to_update = (
make_consumption_data_list_to_create_and_update(
combined_df=combined_df,
existing_consumptions=existing_consumptions,
existing_users=existing_users,
)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新規作成と更新を分けているところがGoodだと思います!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

サマリーページ
image

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

詳細
image

Copy link

@shira-182 shira-182 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Challengeご提出ありがとうございます!
限られた時間にも関わらず、要件を満たしたコードをしっかりと書いていただけてると思いました!

理由は処理速度を優先するためである。

本アプリケーションでは数値の厳密さは重要ではないため、
処理速度を落としてまで `DecimalField` を選択する必要は無いと考えられる。

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

型の比較ありがとうございます!
同意です👍

)


def plot_total_consumption(df: pd.DataFrame, percentiles: pd.DataFrame) -> Figure:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

型ヒントしっかり書かれててgoodです👍

class User(BaseModel):
id = models.IntegerField(primary_key=True, help_text='ユーザID')
area = models.CharField(max_length=3, help_text='エリア')
tariff = models.CharField(max_length=3, help_text='関税')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[FYI]命名が分かりにくいのですが、tariffは電気料金プランのことを指してます。

from consumption.models import Consumption, User


class StatisticsTests(TestCase):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

カスタムコマンドのテストもあるとベストです!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

仰る通りです。
インデックスや条件分岐が現れる箇所では特にテストが重要だと思います。
今回はそのテストが無く、バッチ処理がバグっていてテストの重要性を痛感しております…

psycopg2==2.9.3
pandas==2.2.2
matplotlib==3.9.1
ruff==0.5.0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ruffの選択いいですね👏

Copy link
Contributor

@t-miyao t-miyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ご対応ありがとうございました!
いくつかコメントしましたが基本IMOのため修正は不要です。

users_to_update.append(user)
else:
users_to_create.append(User(id=row['id'], area=row['area'], tariff=row['tariff']))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

これくらいの処理であればfor文使わずともpandasのAPI操作だけでできるかなと思いました。

df_user_has_id = df[df.id.isin(existing_users)].to_dict(orient="records")
df_user_has_no_id = df[~df.id.isin(existing_users)].to_dict(orient="records")
 
users_to_create = [User(**user) for user in df_user_has_id]
users_to_update = [User(**user) for user in df_user_has_no_id]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ありがとうございます、勉強になります 🙇


# 列名のチェック
if not all(column in df.columns for column in ['id', 'area', 'tariff']):
raise ValueError("CSV file must contain 'id', 'area', and 'tariff' columns")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSVのカラム形式が不正だったときのチェック良いと思います!
中身の型チェックまでできたら尚良しですね。

continue
User.objects.bulk_update(
users_to_update[i * batch_size : (i + 1) * batch_size], ['area', 'tariff']
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch_sizeを可変にした理由はなんでしょうか?
ここは固定でも良い気がします。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

通信帯域やメモリなど、環境によって適切な batch_size は変わり得ると考え、可変にしました。
また、マジックナンバーは極力記載すべきではないと考えているので、固定値で十分な場合でも変数で持たせるようにしています。

df['user_id'] = int(user_id)
all_dfs.append(df)

combined_df = pd.concat(all_dfs, ignore_index=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1ファイルずつ処理ではなく全てまとめて処理は良いと思います!
ただ大きいファイルや大量ファイルが来た場合にメモリが枯渇する可能性があるのでそこは注意ですね。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

メモリへの配慮が漏れていました。
その点も考慮して分割処理すべきでした。

datetime=row['datetime'],
consumption=row['consumption'],
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

こちらもfor文ではなくpandasのAPI操作だけでできるかと思います。

# 既存の消費データを一括取得
existing_data = Consumption.objects.filter(
user_id__in=user_ids, datetime__in=combined_df['datetime'].tolist()
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combined_dfは同じdatetimeが存在している可能性があるため、combined_df_df['datetime'].tolist().unique()にした方がin句が短くなるかなと思いました。

updated_at = models.DateTimeField(auto_now=True, blank=True, null=True)

# bulk_update では Model.save() が呼ばれないため、updated_at が更新されない
# この対処として、objects を差し替える
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

単純にbulk_updateの際にupdate_atをカラムとして持たせてあげれば良いのかなと思いました。

UPDATE_FIELDS = ["id", "area", "tariff" "updated_at"]
MODEL.objects.bulk_update(updates, fields=UPDATE_FIELDS)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

たしかに今回のケースではその方がシンプルで良いかもしれません。

ただ、将来的に別の場所で bulk_update を実行する場合や
Model.save() が呼ばれない QuerySet.update を実行するときに、
updated_at に関する処理を忘れるリスクを考え Hook として実装しておくのがベターだと考えました。

return f'User {self.id} - Area: {self.area} - Tariff: {self.tariff}'


class Consumption(models.Model):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumptionもBaseModel継承で良いのかなと思いました。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ご指摘の通りです。
こちら継承ミスですね…


class Meta:
constraints = [
models.UniqueConstraint(fields=['user', 'datetime'], name='unique_user_datetime')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

user と datetime で複合ユニークとしているの素晴らしいです。

@@ -0,0 +1,163 @@
import base64

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import の並び順が、pep8 準拠で良いです。

  • 標準ライブラリ
  • サードパーティに関連するもの
  • ローカルな アプリケーション/ライブラリ に特有のもの

)


class Command(BaseCommand):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

実装不要でお考えだけでお聞かせいただければ。
バッチを手動でなく定期バッチで処理される場合、実行ログを残したい。
この時、コード上のどの位置にどのようなログを仕込むのがよいか。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

バッチ処理全体の開始前と開始後にその旨が分かるようなログと、
進捗が分かるように bulk_update などの直前にログを仕込むのが良いと思います。

existing_users = User.objects.in_bulk(df['id'].tolist())
users_to_create, users_to_update = make_user_list_to_create_and_update(df, existing_users)

with transaction.atomic():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

修正不要でお考えだけでお聞かせいただければ。
トランザクションを意識されていて良いです。
ですが、現状ですとユーザの登録変更が成功した後で使用量データの登録変更の途中でエラーが発生すると使用量データの登録変更のみロールバックされてしまう実装になっているように見えます。

ユーザの登録変更、使用量データの登録変更を1連の処理と考えた時、transaction.atomic() をどこに記入すべきでしょうか?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ユーザの登録変更、使用量データの登録変更を1連の処理と考えた時、transaction.atomic() をどこに記入すべきでしょうか?

現状の構成のままであれば、関数 handle の中で import_user_dataimport_all_consumption_data を囲むように transaction.atomic() を記入すれば達成できると思います。
ただ、データベースとは関係のない処理も含んでしまうため、先に登録・変更データをまとめておき、
最後にユーザの登録変更、使用量データの登録変更をまとめて行えるように構成を変えるべきだと思います。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants