Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import sqlite3
from sqlite3 import Connection
from typing import List, Optional, Dict
from pathlib import Path

import exporter
from models import Message, Chat

import os


def query_messages_from_table_messages(con: Connection, key_remote_jid: str, contacts: Dict[str, Optional[str]]) -> List[Message]:
cur = con.cursor()
Expand All @@ -14,7 +17,26 @@ def query_messages_from_table_messages(con: Connection, key_remote_jid: str, con
FROM messages
WHERE key_remote_jid =:key_remote_jid
ORDER BY max(receipt_server_timestamp, received_timestamp)"""

query = """
SELECT * FROM (
SELECT received_timestamp, remote_resource, key_from_me, data, media_caption, media_wa_type
FROM messages
WHERE key_remote_jid =:key_remote_jid
ORDER BY max(receipt_server_timestamp, received_timestamp)
)
UNION
SELECT call_log.timestamp received_timestamp, jid.raw_string remote_resource, from_me key_from_me, NULL data, '' media_caption,
CASE video_call
WHEN 0 THEN 255
WHEN 1 THEN 256
END
AS media_wa_type
FROM call_log
INNER JOIN jid
ON call_log.jid_row_id = jid._id
WHERE raw_string =:key_remote_jid
ORDER BY received_timestamp
"""
messages = []
for received_timestamp, remote_resource, key_from_me, data, media_caption, media_wa_type in cur.execute(query, {"key_remote_jid": key_remote_jid}):
messages.append(
Expand Down Expand Up @@ -86,13 +108,40 @@ def query_contacts(db_path: str) -> Dict[str, Optional[str]]:
con.close()
return contacts

def check_paths(config: configparser.ConfigParser):
if config["input"].getboolean("use_wa_db"):
wa_path = Path(config["input"]['wa_path'].strip('"'))
config["input"]['wa_path'] = str(wa_path)
if not wa_path.exists():
raise Exception(f'Config wa_path does not exist: {wa_path}')

msgstore_path = Path(config["input"].get("msgstore_path").strip('"'))
config["input"]['msgstore_path'] = str(msgstore_path)
if not msgstore_path.exists():
raise Exception(f'Config msgstore_path does not exist: {msgstore_path}')

if config["output"].getboolean("export_html"):
html_output_path = Path(config["output"].get("html_output_path").strip('"'))
config["output"]['html_output_path'] = str(html_output_path)
html_output_dir = Path(os.path.dirname(html_output_path))
if not html_output_dir.exists():
html_output_dir.mkdir()

if config["output"].getboolean("export_txt"):
txt_output_directory_path = Path(config["output"].get("txt_output_directory_path").strip('"'))
config["output"]['txt_output_directory_path'] = str(txt_output_directory_path)
if not txt_output_directory_path.exists():
txt_output_directory_path.mkdir()

def main():
print("### WhatsApp Database Exporter ###")

config = configparser.ConfigParser()
config.read("config.cfg")

print("[+] Reading Config")
check_paths(config)

print("[+] Reading Database")
if config["input"].getboolean("use_wa_db"):
contacts = query_contacts(config["input"].get("wa_path"))
Expand Down
4 changes: 4 additions & 0 deletions models.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def get_content(self) -> str:
return f"[Document] {media_caption}"
elif self.media_wa_type == 16:
return f"[Live Location] {media_caption}"
elif self.media_wa_type == 255:
return f"[AUDIO CALL] {media_caption}"
elif self.media_wa_type == 256:
return f"[VIDEO CALL] {media_caption}"
else:
return self.data

Expand Down