Skip to content

Commit f76f450

Browse files
author
julien
committed
Added support for inserts and updates. Resolves #2
1 parent ebac595 commit f76f450

14 files changed

+512
-270
lines changed

subprojects/trino-redisearch/src/main/java/com/redis/trino/RediSearchConfig.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ public class RediSearchConfig {
1313

1414
public static final String DEFAULT_SCHEMA = "default";
1515
public static final long DEFAULT_LIMIT = 10000;
16-
16+
1717
private String defaultSchema = DEFAULT_SCHEMA;
1818
private Optional<String> uri = Optional.empty();
19+
private boolean caseInsensitiveNameMatching;
1920
private long defaultLimit = DEFAULT_LIMIT;
20-
21+
2122
public long getDefaultLimit() {
2223
return defaultLimit;
2324
}
@@ -54,4 +55,15 @@ public RediSearchConfig setUri(String uri) {
5455
return this;
5556
}
5657

58+
public boolean isCaseInsensitiveNameMatching() {
59+
return caseInsensitiveNameMatching;
60+
}
61+
62+
@Config("redisearch.case-insensitive-name-matching")
63+
@ConfigDescription("Case-insensitive name-matching")
64+
public RediSearchConfig setCaseInsensitiveNameMatching(boolean caseInsensitiveNameMatching) {
65+
this.caseInsensitiveNameMatching = caseInsensitiveNameMatching;
66+
return this;
67+
}
68+
5769
}

subprojects/trino-redisearch/src/main/java/com/redis/trino/RediSearchInsertTableHandle.java

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,24 @@
1010

1111
import static java.util.Objects.requireNonNull;
1212

13-
public class RediSearchInsertTableHandle
14-
implements ConnectorInsertTableHandle
15-
{
16-
private final SchemaTableName schemaTableName;
17-
private final List<RediSearchColumnHandle> columns;
13+
public class RediSearchInsertTableHandle implements ConnectorInsertTableHandle {
14+
private final SchemaTableName schemaTableName;
15+
private final List<RediSearchColumnHandle> columns;
1816

19-
@JsonCreator
20-
public RediSearchInsertTableHandle(
21-
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
22-
@JsonProperty("columns") List<RediSearchColumnHandle> columns)
23-
{
24-
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
25-
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
26-
}
17+
@JsonCreator
18+
public RediSearchInsertTableHandle(@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
19+
@JsonProperty("columns") List<RediSearchColumnHandle> columns) {
20+
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
21+
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
22+
}
2723

28-
@JsonProperty
29-
public SchemaTableName getSchemaTableName()
30-
{
31-
return schemaTableName;
32-
}
24+
@JsonProperty
25+
public SchemaTableName getSchemaTableName() {
26+
return schemaTableName;
27+
}
3328

34-
@JsonProperty
35-
public List<RediSearchColumnHandle> getColumns()
36-
{
37-
return columns;
38-
}
29+
@JsonProperty
30+
public List<RediSearchColumnHandle> getColumns() {
31+
return columns;
32+
}
3933
}

subprojects/trino-redisearch/src/main/java/com/redis/trino/RediSearchMetadata.java

Lines changed: 85 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,31 @@
11
package com.redis.trino;
22

3+
import static com.google.common.base.Preconditions.checkState;
4+
import static com.google.common.collect.ImmutableList.toImmutableList;
35
import static java.lang.Math.toIntExact;
46
import static java.util.Objects.requireNonNull;
57
import static java.util.stream.Collectors.toList;
68

9+
import java.util.Collection;
710
import java.util.List;
811
import java.util.Map;
912
import java.util.Optional;
1013
import java.util.OptionalInt;
14+
import java.util.concurrent.atomic.AtomicReference;
1115

1216
import com.google.common.collect.ImmutableList;
1317
import com.google.common.collect.ImmutableMap;
1418

19+
import io.airlift.slice.Slice;
1520
import io.trino.spi.connector.ColumnHandle;
1621
import io.trino.spi.connector.ColumnMetadata;
22+
import io.trino.spi.connector.ConnectorInsertTableHandle;
1723
import io.trino.spi.connector.ConnectorMetadata;
24+
import io.trino.spi.connector.ConnectorOutputMetadata;
25+
import io.trino.spi.connector.ConnectorOutputTableHandle;
1826
import io.trino.spi.connector.ConnectorSession;
1927
import io.trino.spi.connector.ConnectorTableHandle;
28+
import io.trino.spi.connector.ConnectorTableLayout;
2029
import io.trino.spi.connector.ConnectorTableMetadata;
2130
import io.trino.spi.connector.ConnectorTableProperties;
2231
import io.trino.spi.connector.Constraint;
@@ -27,12 +36,13 @@
2736
import io.trino.spi.connector.SchemaTablePrefix;
2837
import io.trino.spi.connector.TableNotFoundException;
2938
import io.trino.spi.predicate.TupleDomain;
39+
import io.trino.spi.statistics.ComputedStatistics;
3040

3141
public class RediSearchMetadata implements ConnectorMetadata {
3242

3343
private final RediSearchSession rediSearchSession;
34-
// private final AtomicReference<Runnable> rollbackAction = new AtomicReference<>();
3544
private final String schemaName;
45+
private final AtomicReference<Runnable> rollbackAction = new AtomicReference<>();
3646

3747
public RediSearchMetadata(RediSearchSession rediSearchSession) {
3848
this.rediSearchSession = requireNonNull(rediSearchSession, "rediSearchSession is null");
@@ -114,61 +124,69 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
114124
return ((RediSearchColumnHandle) columnHandle).toColumnMetadata();
115125
}
116126

117-
// @Override
118-
// public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) {
119-
// rediSearchSession.createTable(tableMetadata.getTable(), buildColumnHandles(tableMetadata));
120-
// }
121-
//
122-
// @Override
123-
// public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) {
124-
// RediSearchTableHandle table = (RediSearchTableHandle) tableHandle;
125-
//
126-
// rediSearchSession.dropTable(table.getSchemaTableName());
127-
// }
128-
//
129-
// @Override
130-
// public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata,
131-
// Optional<ConnectorNewTableLayout> layout) {
132-
// List<RediSearchColumnHandle> columns = buildColumnHandles(tableMetadata);
133-
//
134-
// rediSearchSession.createTable(tableMetadata.getTable(), columns);
135-
//
136-
// setRollback(() -> rediSearchSession.dropTable(tableMetadata.getTable()));
137-
//
138-
// return new RediSearchOutputTableHandle(tableMetadata.getTable(),
139-
// columns.stream().filter(c -> !c.isHidden()).collect(toList()));
140-
// }
141-
142-
// @Override
143-
// public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session,
144-
// ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments,
145-
// Collection<ComputedStatistics> computedStatistics) {
146-
// clearRollback();
147-
// return Optional.empty();
148-
// }
149-
150-
// @Override
151-
// public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle) {
152-
// RediSearchTableHandle table = (RediSearchTableHandle) tableHandle;
153-
// List<RediSearchColumnHandle> columns = rediSearchSession.getTable(table.getSchemaTableName()).getColumns();
154-
//
155-
// return new RediSearchInsertTableHandle(table.getSchemaTableName(),
156-
// columns.stream().filter(column -> !column.isHidden())
157-
// .peek(column -> validateColumnNameForInsert(column.getName())).collect(toImmutableList()));
158-
// }
159-
//
160-
// @Override
161-
// public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
162-
// ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments,
163-
// Collection<ComputedStatistics> computedStatistics) {
164-
// return Optional.empty();
165-
// }
127+
@Override
128+
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) {
129+
rediSearchSession.createTable(tableMetadata.getTable(), buildColumnHandles(tableMetadata));
130+
}
166131

167132
@Override
168-
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) {
169-
RediSearchTableHandle tableHandle = (RediSearchTableHandle) table;
133+
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) {
134+
RediSearchTableHandle table = (RediSearchTableHandle) tableHandle;
135+
rediSearchSession.dropTable(table.getSchemaTableName());
136+
}
137+
138+
@Override
139+
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column) {
140+
rediSearchSession.addColumn(((RediSearchTableHandle) tableHandle).getSchemaTableName(), column);
141+
}
142+
143+
@Override
144+
public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column) {
145+
rediSearchSession.dropColumn(((RediSearchTableHandle) tableHandle).getSchemaTableName(),
146+
((RediSearchColumnHandle) column).getName());
147+
}
148+
149+
@Override
150+
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata,
151+
Optional<ConnectorTableLayout> layout) {
152+
List<RediSearchColumnHandle> columns = buildColumnHandles(tableMetadata);
153+
154+
rediSearchSession.createTable(tableMetadata.getTable(), columns);
155+
156+
setRollback(() -> rediSearchSession.dropTable(tableMetadata.getTable()));
157+
158+
return new RediSearchOutputTableHandle(tableMetadata.getTable(),
159+
columns.stream().filter(c -> !c.isHidden()).collect(toList()));
160+
}
161+
162+
@Override
163+
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session,
164+
ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments,
165+
Collection<ComputedStatistics> computedStatistics) {
166+
clearRollback();
167+
return Optional.empty();
168+
}
169+
170+
@Override
171+
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle) {
172+
RediSearchTableHandle table = (RediSearchTableHandle) tableHandle;
173+
List<RediSearchColumnHandle> columns = rediSearchSession.getTable(table.getSchemaTableName()).getColumns();
174+
175+
return new RediSearchInsertTableHandle(table.getSchemaTableName(),
176+
columns.stream().filter(column -> !column.isHidden()).collect(toImmutableList()));
177+
}
178+
179+
@Override
180+
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
181+
ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments,
182+
Collection<ComputedStatistics> computedStatistics) {
183+
return Optional.empty();
184+
}
170185

171-
return new ConnectorTableProperties(tableHandle.getConstraint(), Optional.empty(), Optional.empty(),
186+
@Override
187+
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) {
188+
RediSearchTableHandle handle = (RediSearchTableHandle) table;
189+
return new ConnectorTableProperties(handle.getConstraint(), Optional.empty(), Optional.empty(),
172190
Optional.empty(), ImmutableList.of());
173191
}
174192

@@ -205,17 +223,17 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
205223
return Optional.of(new ConstraintApplicationResult<>(handle, constraint.getSummary(), false));
206224
}
207225

208-
// private void setRollback(Runnable action) {
209-
// checkState(rollbackAction.compareAndSet(null, action), "rollback action is already set");
210-
// }
211-
//
212-
// private void clearRollback() {
213-
// rollbackAction.set(null);
214-
// }
226+
private void setRollback(Runnable action) {
227+
checkState(rollbackAction.compareAndSet(null, action), "rollback action is already set");
228+
}
215229

216-
// public void rollback() {
217-
// Optional.ofNullable(rollbackAction.getAndSet(null)).ifPresent(Runnable::run);
218-
// }
230+
private void clearRollback() {
231+
rollbackAction.set(null);
232+
}
233+
234+
public void rollback() {
235+
Optional.ofNullable(rollbackAction.getAndSet(null)).ifPresent(Runnable::run);
236+
}
219237

220238
private static SchemaTableName getTableName(ConnectorTableHandle tableHandle) {
221239
return ((RediSearchTableHandle) tableHandle).getSchemaTableName();
@@ -231,14 +249,8 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema
231249
return new ConnectorTableMetadata(tableName, columns);
232250
}
233251

234-
// private static List<RediSearchColumnHandle> buildColumnHandles(ConnectorTableMetadata tableMetadata) {
235-
// return tableMetadata.getColumns().stream()
236-
// .map(m -> new RediSearchColumnHandle(m.getName(), m.getType(), m.isHidden())).collect(toList());
237-
// }
238-
239-
// private static void validateColumnNameForInsert(String columnName) {
240-
// if (columnName.contains("$") || columnName.contains(".")) {
241-
// throw new IllegalArgumentException("Column name must not contain '$' or '.' for INSERT: " + columnName);
242-
// }
243-
// }
252+
private static List<RediSearchColumnHandle> buildColumnHandles(ConnectorTableMetadata tableMetadata) {
253+
return tableMetadata.getColumns().stream()
254+
.map(m -> new RediSearchColumnHandle(m.getName(), m.getType(), m.isHidden())).collect(toList());
255+
}
244256
}

subprojects/trino-redisearch/src/main/java/com/redis/trino/RediSearchOutputTableHandle.java

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,24 @@
1010

1111
import static java.util.Objects.requireNonNull;
1212

13-
public class RediSearchOutputTableHandle
14-
implements ConnectorOutputTableHandle
15-
{
16-
private final SchemaTableName schemaTableName;
17-
private final List<RediSearchColumnHandle> columns;
13+
public class RediSearchOutputTableHandle implements ConnectorOutputTableHandle {
14+
private final SchemaTableName schemaTableName;
15+
private final List<RediSearchColumnHandle> columns;
1816

19-
@JsonCreator
20-
public RediSearchOutputTableHandle(
21-
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
22-
@JsonProperty("columns") List<RediSearchColumnHandle> columns)
23-
{
24-
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
25-
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
26-
}
17+
@JsonCreator
18+
public RediSearchOutputTableHandle(@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
19+
@JsonProperty("columns") List<RediSearchColumnHandle> columns) {
20+
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
21+
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
22+
}
2723

28-
@JsonProperty
29-
public SchemaTableName getSchemaTableName()
30-
{
31-
return schemaTableName;
32-
}
24+
@JsonProperty
25+
public SchemaTableName getSchemaTableName() {
26+
return schemaTableName;
27+
}
3328

34-
@JsonProperty
35-
public List<RediSearchColumnHandle> getColumns()
36-
{
37-
return columns;
38-
}
29+
@JsonProperty
30+
public List<RediSearchColumnHandle> getColumns() {
31+
return columns;
32+
}
3933
}

subprojects/trino-redisearch/src/main/java/com/redis/trino/RediSearchPageSink.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import io.trino.spi.type.TimestampWithTimeZoneType;
5252
import io.trino.spi.type.TinyintType;
5353
import io.trino.spi.type.Type;
54+
import io.trino.spi.type.VarbinaryType;
5455
import io.trino.spi.type.VarcharType;
5556

5657
public class RediSearchPageSink implements ConnectorPageSink {
@@ -78,7 +79,11 @@ public CompletableFuture<?> appendPage(Page page) {
7879
String key = schemaTableName.getTableName() + ":" + factory.create().toString();
7980
for (int channel = 0; channel < page.getChannelCount(); channel++) {
8081
RediSearchColumnHandle column = columns.get(channel);
81-
map.put(column.getName(), getValue(columns.get(channel).getType(), page.getBlock(channel), position));
82+
Block block = page.getBlock(channel);
83+
if (block.isNull(position)) {
84+
continue;
85+
}
86+
map.put(column.getName(), getObjectValue(columns.get(channel).getType(), block, position));
8287
}
8388
RedisFuture<Long> future = async.hset(key, map);
8489
futures.add(future);
@@ -89,11 +94,7 @@ public CompletableFuture<?> appendPage(Page page) {
8994
return NOT_BLOCKED;
9095
}
9196

92-
@SuppressWarnings("deprecation")
93-
private String getValue(Type type, Block block, int position) {
94-
if (block.isNull(position)) {
95-
return null;
96-
}
97+
private String getObjectValue(Type type, Block block, int position) {
9798
if (type.equals(BooleanType.BOOLEAN)) {
9899
return String.valueOf(type.getBoolean(block, position));
99100
}
@@ -121,13 +122,16 @@ private String getValue(Type type, Block block, int position) {
121122
if (type instanceof CharType) {
122123
return padSpaces(type.getSlice(block, position), ((CharType) type)).toStringUtf8();
123124
}
125+
if (type.equals(VarbinaryType.VARBINARY)) {
126+
return new String(type.getSlice(block, position).getBytes());
127+
}
124128
if (type.equals(DateType.DATE)) {
125129
long days = type.getLong(block, position);
126130
return DateTimeFormatter.ISO_DATE.format(LocalDate.ofEpochDay(days));
127131
}
128-
if (type.equals(TimeType.TIME)) {
132+
if (type.equals(TimeType.TIME_MILLIS)) {
129133
long picos = type.getLong(block, position);
130-
return String.valueOf((roundDiv(picos, PICOSECONDS_PER_MILLISECOND)));
134+
return String.valueOf(roundDiv(picos, PICOSECONDS_PER_MILLISECOND));
131135
}
132136
if (type.equals(TIMESTAMP_MILLIS)) {
133137
long millisUtc = floorDiv(type.getLong(block, position), MICROSECONDS_PER_MILLISECOND);

0 commit comments

Comments
 (0)