Skip to content

Commit 6bdca04

Browse files
authored
Merge pull request #101 from mindsdb/jobs
Job supports SDK comands
2 parents 2f8e7fd + 0eaec59 commit 6bdca04

File tree

8 files changed

+410
-138
lines changed

8 files changed

+410
-138
lines changed

mindsdb_sdk/connectors/rest_api.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import requests
66
import pandas as pd
77

8-
from .. import __about__
8+
from mindsdb_sdk import __about__
99

1010

1111
def _try_relogin(fnc):
@@ -73,7 +73,11 @@ def login(self):
7373
_raise_for_status(r)
7474

7575
@_try_relogin
76-
def sql_query(self, sql, database='mindsdb', lowercase_columns=False):
76+
def sql_query(self, sql, database=None, lowercase_columns=False):
77+
78+
if database is None:
79+
# it means the database is included in query
80+
database = 'mindsdb'
7781
url = self.url + '/api/sql/query'
7882
r = self.session.post(url, json={
7983
'query': sql,

mindsdb_sdk/jobs.py

Lines changed: 97 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,32 @@
11
import datetime as dt
22
from typing import Union, List
33

4+
45
import pandas as pd
56

67
from mindsdb_sql.parser.dialects.mindsdb import CreateJob, DropJob
78
from mindsdb_sql.parser.ast import Identifier, Star, Select
89

10+
from mindsdb_sdk.query import Query
911
from mindsdb_sdk.utils.sql import dict_to_binary_op
1012
from mindsdb_sdk.utils.objects_collection import CollectionBase
13+
from mindsdb_sdk.utils.context import set_saving
1114

1215

1316
class Job:
14-
def __init__(self, project, data):
17+
def __init__(self, project, name, data=None, create_callback=None):
1518
self.project = project
19+
self.name = name
1620
self.data = data
17-
self._update(data)
21+
22+
self.query_str = None
23+
if data is not None:
24+
self._update(data)
25+
self._queries = []
26+
self._create_callback = create_callback
1827

1928
def _update(self, data):
20-
self.name = data['name']
29+
# self.name = data['name']
2130
self.query_str = data['query']
2231
self.start_at = data['start_at']
2332
self.end_at = data['end_at']
@@ -27,13 +36,55 @@ def _update(self, data):
2736
def __repr__(self):
2837
return f"{self.__class__.__name__}({self.name}, query='{self.query_str}')"
2938

39+
def __enter__(self):
40+
if self._create_callback is None:
41+
raise ValueError("The job is already created and can't be used to create context."
42+
" To be able to use context: create job without 'query_str' parameter: "
43+
"\n>>> with con.jobs.create('j1') as job:"
44+
"\n>>> job.add_query(...)")
45+
set_saving(f'job-{self.name}')
46+
47+
return self
48+
49+
def __exit__(self, type, value, traceback):
50+
set_saving(None)
51+
if type is None:
52+
if len(self._queries) == 0:
53+
raise ValueError('No queries were added to job')
54+
55+
query_str = '; '.join(self._queries)
56+
57+
self._create_callback(query_str)
58+
59+
self.refresh()
60+
3061
def refresh(self):
3162
"""
3263
Retrieve job data from mindsdb server
3364
"""
3465
job = self.project.get_job(self.name)
3566
self._update(job.data)
3667

68+
def add_query(self, query: Union[Query, str]):
69+
"""
70+
Add a query to job. Method is used in context of the job
71+
72+
>>> with con.jobs.create('j1') as job:
73+
>>> job.add_query(table1.insert(table2))
74+
75+
:param query: string or Query object. Query.database should be emtpy or the same as job's project
76+
"""
77+
if isinstance(query, Query):
78+
79+
if query.database is not None and query.database != self.project.name:
80+
# we can't execute this query in jobs project
81+
raise ValueError(f"Wrong query database: {query.database}. You could try to use sql string instead")
82+
83+
query = query.sql
84+
elif not isinstance(query, str):
85+
raise ValueError(f'Unable to use add this object as a query: {query}. Try to use sql string instead')
86+
self._queries.append(query)
87+
3788
def get_history(self) -> pd.DataFrame:
3889
"""
3990
Get history of job execution
@@ -69,7 +120,7 @@ def _list(self, name: str = None) -> List[Job]:
69120
df = df.rename(columns=cols_map)
70121

71122
return [
72-
Job(self.project, item)
123+
Job(self.project, item.pop('name'), item)
73124
for item in df.to_dict('records')
74125
]
75126

@@ -101,7 +152,7 @@ def get(self, name: str) -> Job:
101152
def create(
102153
self,
103154
name: str,
104-
query_str: str,
155+
query_str: str = None,
105156
start_at: dt.datetime = None,
106157
end_at: dt.datetime = None,
107158
repeat_str: str = None,
@@ -113,7 +164,25 @@ def create(
113164
If it is not possible (job executed and not accessible anymore):
114165
return None
115166
116-
More info: https://docs.mindsdb.com/sql/create/jobs
167+
Usage options:
168+
169+
Option 1: to use string query
170+
All job tasks could be passed as string with sql queries. Job is created immediately
171+
172+
>>> job = con.jobs.create('j1', query_str='retrain m1; show models', repeat_min=1):
173+
174+
Option 2: to use 'with' block.
175+
It allows to pass sdk commands to job tasks.
176+
Not all sdk commands could be accepted here,
177+
only those which are converted in to sql in sdk and sent to /query endpoint
178+
Adding query sql string is accepted as well
179+
Job will be created after exit from 'with' block
180+
181+
>>> with con.jobs.create('j1', repeat_min=1) as job:
182+
>>> job.add_query(table1.insert(table2))
183+
>>> job.add_query('retrain m1') # using string
184+
185+
More info about jobs: https://docs.mindsdb.com/sql/create/jobs
117186
118187
:param name: name of the job
119188
:param query_str: str, job's query (or list of queries with ';' delimiter) which job have to execute
@@ -137,20 +206,30 @@ def create(
137206
if repeat_min is not None:
138207
repeat_str = f'{repeat_min} minutes'
139208

140-
ast_query = CreateJob(
141-
name=Identifier(name),
142-
query_str=query_str,
143-
start_str=start_str,
144-
end_str=end_str,
145-
repeat_str=repeat_str
146-
)
209+
def _create_callback(query):
210+
ast_query = CreateJob(
211+
name=Identifier(name),
212+
query_str=query,
213+
start_str=start_str,
214+
end_str=end_str,
215+
repeat_str=repeat_str
216+
)
217+
218+
self.api.sql_query(ast_query.to_string(), database=self.project.name)
219+
220+
if query_str is None:
221+
# allow to create context with job
222+
job = Job(self.project, name, create_callback=_create_callback)
223+
return job
224+
else:
225+
# create it
226+
_create_callback(query_str)
147227

148-
self.api.sql_query(ast_query.to_string(), database=self.project.name)
228+
# job can be executed and remove it is not repeatable
229+
jobs = self._list(name)
230+
if len(jobs) == 1:
231+
return jobs[0]
149232

150-
# job can be executed and remove it is not repeatable
151-
jobs = self._list(name)
152-
if len(jobs) == 1:
153-
return jobs[0]
154233

155234
def drop(self, name: str):
156235
"""

mindsdb_sdk/knowledge_bases.py

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
from mindsdb_sql.parser.dialects.mindsdb import CreateKnowledgeBase, DropKnowledgeBase
88
from mindsdb_sql.parser.ast import Identifier, Star, Select, BinaryOperation, Constant, Insert
99

10-
from mindsdb_sdk.utils.sql import dict_to_binary_op
10+
from mindsdb_sdk.utils.sql import dict_to_binary_op, query_to_native_query
1111
from mindsdb_sdk.utils.objects_collection import CollectionBase
12+
from mindsdb_sdk.utils.context import is_saving
1213

1314
from .models import Model
1415
from .tables import Table
@@ -35,6 +36,7 @@ def __init__(self, project, data: dict):
3536

3637
self.project = project
3738
self.name = data['name']
39+
self.table_name = Identifier(parts=[self.project.name, self.name])
3840

3941
self.storage = None
4042
if data['storage'] is not None:
@@ -69,10 +71,10 @@ def __init__(self, project, data: dict):
6971
self._query = None
7072
self._limit = None
7173

72-
database = project.name
7374
self._update_query()
7475

75-
super().__init__(project.api, self.sql, database)
76+
# empty database
77+
super().__init__(project.api, self.sql, None)
7678

7779
def __repr__(self):
7880
return f'{self.__class__.__name__}({self.project.name}.{self.name})'
@@ -104,9 +106,7 @@ def _update_query(self):
104106

105107
ast_query = Select(
106108
targets=[Star()],
107-
from_table=Identifier(parts=[
108-
self.project.name, self.name
109-
])
109+
from_table=self.table_name
110110
)
111111
if self._query is not None:
112112
ast_query.where = BinaryOperation(op='=', args=[
@@ -143,20 +143,28 @@ def insert(self, data: Union[pd.DataFrame, Query, dict]):
143143
data_split = data.to_dict('split')
144144

145145
ast_query = Insert(
146-
table=Identifier(self.name),
146+
table=Identifier(self.table_name),
147147
columns=data_split['columns'],
148148
values=data_split['data']
149149
)
150-
151150
sql = ast_query.to_string()
152-
self.api.sql_query(sql, self.database)
151+
153152
else:
154153
# insert from select
155-
table = Identifier(parts=[self.database, self.name])
156-
self.api.sql_query(
157-
f'INSERT INTO {table.to_string()} ({data.sql})',
158-
database=data.database
159-
)
154+
if data.database is not None:
155+
ast_query = Insert(
156+
table=Identifier(self.table_name),
157+
from_select=query_to_native_query(data)
158+
)
159+
sql = ast_query.to_string()
160+
else:
161+
sql = f'INSERT INTO {self.table_name.to_string()} ({data.sql})'
162+
163+
if is_saving():
164+
# don't execute it right now, return query object
165+
return Query(self, sql, self.database)
166+
167+
self.api.sql_query(sql, self.database)
160168

161169

162170
class KnowledgeBases(CollectionBase):
@@ -241,7 +249,7 @@ def create(
241249
content_columns: list = None,
242250
id_column: str = None,
243251
params: dict = None,
244-
) -> KnowledgeBase:
252+
) -> Union[KnowledgeBase, Query]:
245253
"""
246254
247255
Create knowledge base:
@@ -291,13 +299,17 @@ def create(
291299
storage_name = None
292300

293301
ast_query = CreateKnowledgeBase(
294-
Identifier(name),
302+
Identifier(parts=[self.project.name, name]),
295303
model=model_name,
296304
storage=storage_name,
297305
params=params_out
298306
)
307+
sql = ast_query.to_string()
299308

300-
self.api.sql_query(ast_query.to_string(), database=self.project.name)
309+
if is_saving():
310+
return Query(self, sql)
311+
312+
self.api.sql_query(sql)
301313

302314
return self.get(name)
303315

@@ -308,6 +320,10 @@ def drop(self, name: str):
308320
:return:
309321
"""
310322

311-
ast_query = DropKnowledgeBase(Identifier(name))
323+
ast_query = DropKnowledgeBase(Identifier(parts=[self.project.name, name]))
324+
sql = ast_query.to_string()
325+
326+
if is_saving():
327+
return Query(self, sql)
312328

313-
self.api.sql_query(ast_query.to_string())
329+
self.api.sql_query(sql)

0 commit comments

Comments
 (0)