Skip to content

Commit e218254

Browse files
DeanChensjcopybara-github
authored andcommitted
feat: Add SqliteSessionService and a migration script to migrate existing DB using DatabaseSessionService to SqliteSessionService
The new Sqlite version has fixed schema and use a single column to store Event data, this should avoid DB migration for future add to the Event object. - This change introduces `SqliteSessionService`, an asynchronous session service using `aiosqlite` that stores event data as JSON within SQLite. - A migration script, `migrate_from_sqlalchemy_sqlite.py`, is included to transition data from the older SQLAlchemy-based SQLite schema to this new format. - The CLI service registry is updated to use SqliteSessionService for sqlite:// URIs. - Throw error when user trying to access a legacy DB and advice the user to do the migration. Co-authored-by: Shangjie Chen <deanchen@google.com> PiperOrigin-RevId: 829971174
1 parent 50ceda0 commit e218254

File tree

6 files changed

+850
-48
lines changed

6 files changed

+850
-48
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ classifiers = [ # List of https://pypi.org/classifiers/
2626
dependencies = [
2727
# go/keep-sorted start
2828
"PyYAML>=6.0.2, <7.0.0", # For APIHubToolset.
29+
"aiosqlite>=0.21.0", # For SQLite database
2930
"anyio>=4.9.0, <5.0.0;python_version>='3.10'", # For MCP Session Manager
3031
"authlib>=1.5.1, <2.0.0", # For RestAPI Tool
3132
"click>=8.1.8, <9.0.0", # For CLI tools

src/google/adk/cli/service_registry.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
from ..artifacts.base_artifact_service import BaseArtifactService
2424
from ..memory.base_memory_service import BaseMemoryService
25+
from ..sessions import InMemorySessionService
2526
from ..sessions.base_session_service import BaseSessionService
2627

2728

@@ -170,8 +171,22 @@ def database_session_factory(uri: str, **kwargs):
170171
kwargs_copy.pop("agents_dir", None)
171172
return DatabaseSessionService(db_url=uri, **kwargs_copy)
172173

174+
def sqlite_session_factory(uri: str, **kwargs):
175+
from ..sessions.sqlite_session_service import SqliteSessionService
176+
177+
parsed = urlparse(uri)
178+
db_path = parsed.path
179+
if not db_path:
180+
return InMemorySessionService()
181+
elif db_path.startswith("/"):
182+
db_path = db_path[1:]
183+
kwargs_copy = kwargs.copy()
184+
kwargs_copy.pop("agents_dir", None)
185+
return SqliteSessionService(db_path=db_path, **kwargs_copy)
186+
173187
registry.register_session_service("agentengine", agentengine_session_factory)
174-
for scheme in ["sqlite", "postgresql", "mysql"]:
188+
registry.register_session_service("sqlite", sqlite_session_factory)
189+
for scheme in ["postgresql", "mysql"]:
175190
registry.register_session_service(scheme, database_session_factory)
176191

177192
# -- Artifact Services --
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Migration script from SQLAlchemy SQLite to the new SQLite JSON schema."""
15+
16+
from __future__ import annotations
17+
18+
import argparse
19+
from datetime import timezone
20+
import json
21+
import logging
22+
import sqlite3
23+
import sys
24+
25+
from google.adk.sessions import database_session_service as dss
26+
from google.adk.sessions import sqlite_session_service as sss
27+
from sqlalchemy import create_engine
28+
from sqlalchemy.orm import sessionmaker
29+
30+
logger = logging.getLogger("google_adk." + __name__)
31+
32+
33+
def migrate(source_db_url: str, dest_db_path: str):
34+
"""Migrates data from a SQLAlchemy-based SQLite DB to the new schema."""
35+
logger.info(f"Connecting to source database: {source_db_url}")
36+
try:
37+
engine = create_engine(source_db_url)
38+
dss.Base.metadata.create_all(engine) # Ensure tables exist for inspection
39+
SourceSession = sessionmaker(bind=engine)
40+
source_session = SourceSession()
41+
except Exception as e:
42+
logger.error(f"Failed to connect to source database: {e}")
43+
sys.exit(1)
44+
45+
logger.info(f"Connecting to destination database: {dest_db_path}")
46+
try:
47+
dest_conn = sqlite3.connect(dest_db_path)
48+
dest_cursor = dest_conn.cursor()
49+
dest_cursor.execute(sss.PRAGMA_FOREIGN_KEYS)
50+
dest_cursor.executescript(sss.CREATE_SCHEMA_SQL)
51+
except Exception as e:
52+
logger.error(f"Failed to connect to destination database: {e}")
53+
sys.exit(1)
54+
55+
try:
56+
# Migrate app_states
57+
logger.info("Migrating app_states...")
58+
app_states = source_session.query(dss.StorageAppState).all()
59+
for item in app_states:
60+
dest_cursor.execute(
61+
"INSERT INTO app_states (app_name, state, update_time) VALUES (?,"
62+
" ?, ?)",
63+
(
64+
item.app_name,
65+
json.dumps(item.state),
66+
item.update_time.replace(tzinfo=timezone.utc).timestamp(),
67+
),
68+
)
69+
logger.info(f"Migrated {len(app_states)} app_states.")
70+
71+
# Migrate user_states
72+
logger.info("Migrating user_states...")
73+
user_states = source_session.query(dss.StorageUserState).all()
74+
for item in user_states:
75+
dest_cursor.execute(
76+
"INSERT INTO user_states (app_name, user_id, state, update_time)"
77+
" VALUES (?, ?, ?, ?)",
78+
(
79+
item.app_name,
80+
item.user_id,
81+
json.dumps(item.state),
82+
item.update_time.replace(tzinfo=timezone.utc).timestamp(),
83+
),
84+
)
85+
logger.info(f"Migrated {len(user_states)} user_states.")
86+
87+
# Migrate sessions
88+
logger.info("Migrating sessions...")
89+
sessions = source_session.query(dss.StorageSession).all()
90+
for item in sessions:
91+
dest_cursor.execute(
92+
"INSERT INTO sessions (app_name, user_id, id, state, create_time,"
93+
" update_time) VALUES (?, ?, ?, ?, ?, ?)",
94+
(
95+
item.app_name,
96+
item.user_id,
97+
item.id,
98+
json.dumps(item.state),
99+
item.create_time.replace(tzinfo=timezone.utc).timestamp(),
100+
item.update_time.replace(tzinfo=timezone.utc).timestamp(),
101+
),
102+
)
103+
logger.info(f"Migrated {len(sessions)} sessions.")
104+
105+
# Migrate events
106+
logger.info("Migrating events...")
107+
events = source_session.query(dss.StorageEvent).all()
108+
for item in events:
109+
try:
110+
event_obj = item.to_event()
111+
event_data = event_obj.model_dump_json(exclude_none=True)
112+
dest_cursor.execute(
113+
"INSERT INTO events (id, app_name, user_id, session_id,"
114+
" invocation_id, timestamp, event_data) VALUES (?, ?, ?, ?, ?,"
115+
" ?, ?)",
116+
(
117+
event_obj.id,
118+
item.app_name,
119+
item.user_id,
120+
item.session_id,
121+
event_obj.invocation_id,
122+
event_obj.timestamp,
123+
event_data,
124+
),
125+
)
126+
except Exception as e:
127+
logger.warning(f"Failed to migrate event {item.id}: {e}")
128+
logger.info(f"Migrated {len(events)} events.")
129+
130+
dest_conn.commit()
131+
logger.info("Migration completed successfully.")
132+
133+
except Exception as e:
134+
logger.error(f"An error occurred during migration: {e}", exc_info=True)
135+
dest_conn.rollback()
136+
sys.exit(1)
137+
finally:
138+
source_session.close()
139+
dest_conn.close()
140+
141+
142+
if __name__ == "__main__":
143+
parser = argparse.ArgumentParser(
144+
description=(
145+
"Migrate ADK sessions from an existing SQLAlchemy-based "
146+
"SQLite database to a new SQLite database with JSON events."
147+
)
148+
)
149+
parser.add_argument(
150+
"--source_db_path",
151+
required=True,
152+
help="Path to the source SQLite database file (e.g., /path/to/old.db)",
153+
)
154+
parser.add_argument(
155+
"--dest_db_path",
156+
required=True,
157+
help=(
158+
"Path to the destination SQLite database file (e.g., /path/to/new.db)"
159+
),
160+
)
161+
args = parser.parse_args()
162+
163+
source_url = f"sqlite:///{args.source_db_path}"
164+
migrate(source_url, args.dest_db_path)

0 commit comments

Comments
 (0)