|
1 | 1 | package com.redis.trino; |
2 | 2 |
|
3 | 3 | import static com.google.common.base.Preconditions.checkArgument; |
| 4 | +import static com.google.common.base.Throwables.throwIfInstanceOf; |
4 | 5 | import static com.google.common.base.Verify.verify; |
5 | 6 | import static io.trino.spi.type.BigintType.BIGINT; |
6 | 7 | import static io.trino.spi.type.DoubleType.DOUBLE; |
|
10 | 11 | import static java.lang.Math.toIntExact; |
11 | 12 | import static java.lang.String.format; |
12 | 13 | import static java.util.Objects.requireNonNull; |
| 14 | +import static java.util.concurrent.TimeUnit.HOURS; |
| 15 | +import static java.util.concurrent.TimeUnit.MINUTES; |
13 | 16 |
|
14 | 17 | import java.util.ArrayList; |
15 | 18 | import java.util.Collections; |
|
20 | 23 | import java.util.stream.Collectors; |
21 | 24 |
|
22 | 25 | import com.google.common.annotations.VisibleForTesting; |
| 26 | +import com.google.common.cache.CacheBuilder; |
| 27 | +import com.google.common.cache.CacheLoader; |
| 28 | +import com.google.common.cache.LoadingCache; |
23 | 29 | import com.google.common.collect.ImmutableList; |
24 | 30 | import com.google.common.collect.ImmutableSet; |
25 | 31 | import com.google.common.primitives.Primitives; |
26 | 32 | import com.google.common.primitives.Shorts; |
27 | 33 | import com.google.common.primitives.SignedBytes; |
| 34 | +import com.google.common.util.concurrent.UncheckedExecutionException; |
28 | 35 | import com.redis.lettucemod.RedisModulesUtils; |
29 | 36 | import com.redis.lettucemod.api.StatefulRedisModulesConnection; |
30 | 37 | import com.redis.lettucemod.search.CreateOptions; |
|
43 | 50 | import io.redisearch.querybuilder.Value; |
44 | 51 | import io.redisearch.querybuilder.Values; |
45 | 52 | import io.trino.spi.HostAddress; |
| 53 | +import io.trino.spi.TrinoException; |
46 | 54 | import io.trino.spi.connector.ColumnHandle; |
47 | 55 | import io.trino.spi.connector.ColumnMetadata; |
48 | 56 | import io.trino.spi.connector.SchemaNotFoundException; |
@@ -72,12 +80,16 @@ public class RediSearchSession { |
72 | 80 | private final TypeManager typeManager; |
73 | 81 | private final StatefulRedisModulesConnection<String, String> connection; |
74 | 82 | private final RediSearchConfig config; |
| 83 | + private final LoadingCache<SchemaTableName, RediSearchTable> tableCache; |
75 | 84 |
|
76 | 85 | public RediSearchSession(TypeManager typeManager, StatefulRedisModulesConnection<String, String> connection, |
77 | 86 | RediSearchConfig config) { |
78 | 87 | this.typeManager = requireNonNull(typeManager, "typeManager is null"); |
79 | 88 | this.connection = requireNonNull(connection, "connection is null"); |
80 | 89 | this.config = requireNonNull(config, "config is null"); |
| 90 | + // TODO make table cache expiration configurable |
| 91 | + this.tableCache = CacheBuilder.newBuilder().expireAfterWrite(1, HOURS).refreshAfterWrite(1, MINUTES) |
| 92 | + .build(CacheLoader.from(this::loadTableSchema)); |
81 | 93 | } |
82 | 94 |
|
83 | 95 | public StatefulRedisModulesConnection<String, String> getConnection() { |
@@ -107,7 +119,12 @@ public Set<String> getAllTables() throws SchemaNotFoundException { |
107 | 119 | } |
108 | 120 |
|
109 | 121 | public RediSearchTable getTable(SchemaTableName tableName) throws TableNotFoundException { |
110 | | - return loadTableSchema(tableName); |
| 122 | + try { |
| 123 | + return tableCache.getUnchecked(tableName); |
| 124 | + } catch (UncheckedExecutionException e) { |
| 125 | + throwIfInstanceOf(e.getCause(), TrinoException.class); |
| 126 | + throw e; |
| 127 | + } |
111 | 128 | } |
112 | 129 |
|
113 | 130 | public void createTable(SchemaTableName name, List<RediSearchColumnHandle> columns) { |
@@ -167,7 +184,7 @@ public SearchResults<String, String> execute(RediSearchTableHandle tableHandle, |
167 | 184 | String index = tableHandle.getSchemaTableName().getTableName(); |
168 | 185 | String query = buildQuery(tableHandle.getConstraint()); |
169 | 186 | Builder<String, String> options = SearchOptions.<String, String>builder(); |
170 | | - tableHandle.getLimit().ifPresent(num -> options.limit(Limit.of(0, num))); |
| 187 | + options.limit(Limit.of(0, tableHandle.getLimit().isPresent() ? tableHandle.getLimit().getAsInt() : 1000)); |
171 | 188 | log.debug("Find documents: index: %s, query: %s", index, query); |
172 | 189 | return connection.sync().search(index, query, options.build()); |
173 | 190 | } |
@@ -227,7 +244,7 @@ private static Optional<Node> buildPredicate(RediSearchColumnHandle column, Doma |
227 | 244 |
|
228 | 245 | // Add back all of the possible single values either as an equality or an IN |
229 | 246 | // predicate |
230 | | - |
| 247 | + |
231 | 248 | if (singleValues.size() == 1) { |
232 | 249 | disjuncts.add(QueryBuilder.intersect(name, value(singleValues.get(0), type))); |
233 | 250 | } else { |
@@ -263,9 +280,7 @@ private static Value value(Object trinoNativeValue, Type type) { |
263 | 280 | return Values.eq((Long) trinoNativeValue); |
264 | 281 | } |
265 | 282 | if (type instanceof VarcharType) { |
266 | | - // TODO introduce RediSearch Field type to know which to use (tag, text, |
267 | | - // numeric) |
268 | | - return Values.value((String) trinoNativeValue); |
| 283 | + return Values.tags((String) trinoNativeValue); |
269 | 284 |
|
270 | 285 | } |
271 | 286 | throw new UnsupportedOperationException("Type " + type + " not supported"); |
|
0 commit comments