Skip to content

Commit f6c1672

Browse files
authored
Merge pull request #99 from mindsdb/add-last
LAST support
2 parents 99d9616 + 4320ab0 commit f6c1672

File tree

4 files changed

+72
-22
lines changed

4 files changed

+72
-22
lines changed

mindsdb_sdk/jobs.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,15 @@ def get(self, name: str) -> Job:
9898
else:
9999
raise RuntimeError("Several jobs with the same name")
100100

101-
def create(self, name: str, query_str: str,
102-
start_at: dt.datetime = None, end_at: dt.datetime = None,
103-
repeat_str: str = None) -> Union[Job, None]:
101+
def create(
102+
self,
103+
name: str,
104+
query_str: str,
105+
start_at: dt.datetime = None,
106+
end_at: dt.datetime = None,
107+
repeat_str: str = None,
108+
repeat_min: int = None,
109+
) -> Union[Job, None]:
104110
"""
105111
Create new job in project and return it.
106112
@@ -114,6 +120,7 @@ def create(self, name: str, query_str: str,
114120
:param start_at: datetime, first start of job,
115121
:param end_at: datetime, when job have to be stopped,
116122
:param repeat_str: str, optional, how to repeat job (e.g. '1 hour', '2 weeks', '3 min')
123+
:param repeat_min: int, optional, period to repeat the job in minutes
117124
:return: Job object or None
118125
"""
119126

@@ -126,6 +133,10 @@ def create(self, name: str, query_str: str,
126133
end_str = end_at.strftime("%Y-%m-%d %H:%M:%S")
127134
else:
128135
end_str = None
136+
137+
if repeat_min is not None:
138+
repeat_str = f'{repeat_min} minutes'
139+
129140
ast_query = CreateJob(
130141
name=Identifier(name),
131142
query_str=query_str,

mindsdb_sdk/tables.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import pandas as pd
66

77
from mindsdb_sql.parser.ast import DropTables
8-
from mindsdb_sql.parser.ast import Select, Star, Identifier, Constant, Delete, Insert, Update
8+
from mindsdb_sql.parser.ast import Select, Star, Identifier, Constant, Delete, Insert, Update, Last, BinaryOperation
99

10-
from mindsdb_sdk.utils.sql import dict_to_binary_op
10+
from mindsdb_sdk.utils.sql import dict_to_binary_op, add_condition
1111
from mindsdb_sdk.utils.objects_collection import CollectionBase
1212

1313
from .query import Query
@@ -20,6 +20,7 @@ def __init__(self, db, name):
2020
self.db = db
2121
self._filters = {}
2222
self._limit = None
23+
self._track_column = None
2324
self._update_query()
2425

2526
def _filters_repr(self):
@@ -45,6 +46,7 @@ def filter(self, **kwargs):
4546
'select * from table1 where a=1 and b=2'
4647
4748
:param kwargs: filter
49+
:return: Table object
4850
"""
4951
# creates new object
5052
query = copy.deepcopy(self)
@@ -57,17 +59,46 @@ def limit(self, val: int):
5759
Applies limit condition to table query
5860
5961
:param val: limit size
62+
:return: Table object
6063
"""
6164
query = copy.deepcopy(self)
6265
query._limit = val
6366
query._update_query()
6467
return query
6568

69+
def track(self, column):
70+
"""
71+
Apply tracking column to table. ('LAST' keyword in mindsdb)
72+
First call returns nothing
73+
The next calls return new records since previous call (where value of track_column is greater)
74+
75+
Example:
76+
77+
>>> query = con.databases.my_db.tables.sales.filter(type='house').track('created_at')
78+
>>> # first call returns no records
79+
>>> df = query.fetch()
80+
>>> # second call returns rows with created_at is greater since previous fetch
81+
>>> df = query.fetch()
82+
83+
:param column: column to track new data from table.
84+
:return: Table object
85+
"""
86+
query = copy.deepcopy(self)
87+
query._track_column = column
88+
89+
query._update_query()
90+
return query
91+
6692
def _update_query(self):
93+
where = dict_to_binary_op(self._filters)
94+
if self._track_column is not None:
95+
condition = BinaryOperation(op='>', args=[Identifier(self._track_column), Last()])
96+
where = add_condition(where, condition)
97+
6798
ast_query = Select(
6899
targets=[Star()],
69100
from_table=Identifier(self.name),
70-
where=dict_to_binary_op(self._filters)
101+
where=where
71102
)
72103
if self._limit is not None:
73104
ast_query.limit = Constant(self._limit)

mindsdb_sdk/utils/sql.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
1-
from mindsdb_sql.parser.ast import *
1+
from mindsdb_sql.parser.ast import BinaryOperation, Identifier, Constant
22

33

44
def dict_to_binary_op(filters):
55
where = None
66
for name, value in filters.items():
7-
where1 = BinaryOperation('=', args=[Identifier(name), Constant(value)])
8-
if where is None:
9-
where = where1
10-
else:
11-
where = BinaryOperation(
12-
'and',
13-
args=[where, where1]
14-
)
7+
condition = BinaryOperation('=', args=[Identifier(name), Constant(value)])
8+
9+
where = add_condition(where, condition)
10+
1511
return where
1612

1713

14+
def add_condition(where, condition):
15+
if where is None:
16+
return condition
17+
else:
18+
return BinaryOperation(
19+
'and',
20+
args=[where, condition]
21+
)
1822

1923

tests/test_sdk.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,15 @@ def check_model(self, model, database, mock_post):
169169
def check_table(self, table, mock_post):
170170
response_mock(mock_post, pd.DataFrame([{'x': 'a'}]))
171171

172-
table = table.filter(a=3, b='2')
173-
table = table.limit(3)
174-
table.fetch()
175-
str(table)
176-
check_sql_call(mock_post, f'SELECT * FROM {table.name} WHERE a = 3 AND b = \'2\' LIMIT 3')
172+
table2 = table.filter(a=3, b='2').limit(3)
173+
table2.fetch()
174+
str(table2)
175+
check_sql_call(mock_post, f'SELECT * FROM {table2.name} WHERE a = 3 AND b = \'2\' LIMIT 3')
176+
177+
# last
178+
table2 = table.filter(a=3).track('type')
179+
table2.fetch()
180+
check_sql_call(mock_post, f'SELECT * FROM {table2.name} WHERE a = 3 AND type > last')
177181

178182

179183
class Test(BaseFlow):
@@ -1040,14 +1044,14 @@ def check_project_jobs(self, project, mock_post):
10401044
project.jobs.create(
10411045
name='job2',
10421046
query_str='retrain m1',
1043-
repeat_str='1 min',
1047+
repeat_min=1,
10441048
start_at=dt.datetime(2025, 2, 5, 11, 22),
10451049
end_at=dt.date(2030, 1, 2)
10461050
)
10471051

10481052
check_sql_call(
10491053
mock_post,
1050-
f"CREATE JOB job2 (retrain m1) START '2025-02-05 11:22:00' END '2030-01-02 00:00:00' EVERY 1 min",
1054+
f"CREATE JOB job2 (retrain m1) START '2025-02-05 11:22:00' END '2030-01-02 00:00:00' EVERY 1 minutes",
10511055
call_stack_num=-2
10521056
)
10531057

0 commit comments

Comments
 (0)