Skip to content

Commit 7fc2306

Browse files
committed
feat: Mongock import to FlamingockCommunity(WIP: dynamodb)
1 parent fbabb8a commit 7fc2306

File tree

11 files changed

+821
-5
lines changed

11 files changed

+821
-5
lines changed

legacy/mongock-importer-dynamodb/build.gradle.kts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,22 @@ dependencies {
33
implementation(project(":utils:dynamodb-util"))
44

55
compileOnly("software.amazon.awssdk:dynamodb-enhanced:2.25.29")
6+
7+
testAnnotationProcessor(project(":core:flamingock-processor"))
8+
testAnnotationProcessor(project(":legacy:mongock-support"))
9+
testImplementation(project(":legacy:mongock-support"))
10+
11+
testImplementation(project(":community:flamingock-auditstore-dynamodb"))
12+
13+
14+
testImplementation(project(":utils:test-util"))
15+
16+
testImplementation("org.testcontainers:junit-jupiter:1.18.3")
17+
testImplementation("org.mockito:mockito-inline:4.11.0")
18+
testImplementation("org.testcontainers:localstack:1.19.7")
19+
testImplementation("software.amazon.awssdk:dynamodb:2.25.61")
20+
21+
testImplementation("org.mockito:mockito-inline:4.11.0")
622
}
723

824

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Copyright 2023 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.importer.mongock.dynamodb;
17+
18+
import io.flamingock.api.annotations.EnableFlamingock;
19+
import io.flamingock.api.annotations.Stage;
20+
import io.flamingock.community.dynamodb.driver.DynamoDBAuditStore;
21+
import io.flamingock.importer.dynamodb.MongockDynamoDBAuditEntry;
22+
import io.flamingock.internal.common.core.audit.AuditEntry;
23+
import io.flamingock.internal.common.core.error.FlamingockException;
24+
import io.flamingock.internal.core.builder.FlamingockFactory;
25+
import io.flamingock.internal.core.runner.Runner;
26+
import io.flamingock.support.mongock.annotations.MongockSupport;
27+
import org.junit.jupiter.api.Assertions;
28+
import org.junit.jupiter.api.BeforeAll;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Disabled;
31+
import org.junit.jupiter.api.Test;
32+
import org.testcontainers.containers.GenericContainer;
33+
import org.testcontainers.junit.jupiter.Container;
34+
import org.testcontainers.junit.jupiter.Testcontainers;
35+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
36+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
37+
import software.amazon.awssdk.regions.Region;
38+
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
39+
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
40+
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
41+
42+
import java.net.URI;
43+
import java.time.Instant;
44+
import java.util.Arrays;
45+
import java.util.HashMap;
46+
import java.util.List;
47+
48+
import static io.flamingock.internal.util.constants.CommunityPersistenceConstants.DEFAULT_AUDIT_STORE_NAME;
49+
import static org.junit.jupiter.api.Assertions.assertEquals;
50+
import static org.junit.jupiter.api.Assertions.assertTrue;
51+
52+
@Testcontainers
53+
@MongockSupport(targetSystem = "dynamodb-target-system")
54+
@EnableFlamingock(stages = {@Stage(location = "io.flamingock.importer.mongock.dynamodb.changes")})
55+
public class DynamoDBImporterTest {
56+
57+
@Container
58+
public static final GenericContainer<?> dynamoDBContainer = new GenericContainer<>("amazon/dynamodb-local:latest")
59+
.withExposedPorts(8000);
60+
61+
public static final String MONGOCK_CHANGE_LOGS = "mongockChangeLogs";
62+
63+
64+
private static DynamoDbClient client;
65+
private DynamoDBTestHelper mongockChangeLogsHelper;
66+
67+
@BeforeAll
68+
static void beforeAll() {
69+
dynamoDBContainer.start();
70+
71+
String endpoint = String.format("http://%s:%d",
72+
dynamoDBContainer.getHost(),
73+
dynamoDBContainer.getMappedPort(8000));
74+
client = DynamoDbClient.builder()
75+
.endpointOverride(URI.create(endpoint))
76+
.region(Region.US_EAST_1)
77+
.credentialsProvider(
78+
StaticCredentialsProvider.create(
79+
AwsBasicCredentials.create("dummy", "dummy")
80+
)
81+
)
82+
.build();
83+
}
84+
85+
@BeforeEach
86+
void setUp() {
87+
mongockChangeLogsHelper = new DynamoDBTestHelper(client, MONGOCK_CHANGE_LOGS);
88+
mongockChangeLogsHelper.ensureTableExists();
89+
mongockChangeLogsHelper.resetTable();
90+
91+
new DynamoDBTestHelper(client, DEFAULT_AUDIT_STORE_NAME).ensureTableExists();
92+
new DynamoDBTestHelper(client, DEFAULT_AUDIT_STORE_NAME).resetTable();
93+
}
94+
95+
@Test
96+
@Disabled("restore when https://trello.com/c/4gEQ8Wb4/458-mongock-legacy-targetsystem done")
97+
void testImportDynamoDBChangeLogs() {
98+
List<MongockDynamoDBAuditEntry> entries = Arrays.asList(
99+
new MongockDynamoDBAuditEntry(
100+
"exec-1",
101+
"client-initializer",
102+
"author1",
103+
String.valueOf(Instant.now().toEpochMilli()),
104+
"EXECUTED",
105+
"EXECUTION",
106+
"io.flamingock.changelog.Class1",
107+
"method1",
108+
new HashMap<String, String>() {{
109+
put("meta1", "value1");
110+
}}.toString(),
111+
123L,
112+
"host1",
113+
null,
114+
true
115+
),
116+
new MongockDynamoDBAuditEntry(
117+
"exec-1",
118+
"client-updater",
119+
"author1",
120+
String.valueOf(Instant.now().toEpochMilli()),
121+
"EXECUTED",
122+
"EXECUTION",
123+
"io.flamingock.changelog.Class2",
124+
"method1",
125+
new HashMap<String, String>() {{
126+
put("meta1", "value1");
127+
}}.toString(),
128+
123L,
129+
"host1",
130+
null,
131+
true
132+
)
133+
);
134+
135+
mongockChangeLogsHelper.insertChangeEntries(entries);
136+
137+
Runner flamingock = FlamingockFactory.getCommunityBuilder()
138+
.setAuditStore(new DynamoDBAuditStore(client))
139+
.build();
140+
141+
flamingock.run();
142+
143+
List<AuditEntry> auditLog = new DynamoDBTestHelper(client, DEFAULT_AUDIT_STORE_NAME).getAuditEntriesSorted();
144+
assertEquals(6, auditLog.size());
145+
146+
for (AuditEntry entry : auditLog) {
147+
System.out.println("executionId: " + entry.getExecutionId());
148+
System.out.println("stageId: " + entry.getStageId());
149+
System.out.println("taskId: " + entry.getTaskId());
150+
System.out.println("author: " + entry.getAuthor());
151+
System.out.println("createdAt: " + entry.getCreatedAt());
152+
System.out.println("state: " + entry.getState());
153+
System.out.println("type: " + entry.getType());
154+
System.out.println("className: " + entry.getClassName());
155+
System.out.println("methodName: " + entry.getMethodName());
156+
System.out.println("executionMillis: " + entry.getExecutionMillis());
157+
System.out.println("executionHostname: " + entry.getExecutionHostname());
158+
System.out.println("metadata: " + entry.getMetadata());
159+
System.out.println("systemChange: " + entry.getSystemChange());
160+
System.out.println("errorTrace: " + entry.getErrorTrace());
161+
System.out.println("txStrategy: " + entry.getTxType());
162+
System.out.println("targetSystemId: " + entry.getTargetSystemId());
163+
System.out.println("order: " + entry.getOrder());
164+
System.out.println("-----");
165+
}
166+
167+
AuditEntry entry1 = auditLog.stream()
168+
.filter(e -> "client-updater".equals(e.getTaskId()))
169+
.findFirst()
170+
.orElseThrow(() -> new AssertionError("Entry with changeId 'client-updater' not found"));
171+
172+
173+
assertEquals("client-updater", entry1.getTaskId());
174+
assertEquals("author1", entry1.getAuthor());
175+
assertEquals("exec-1", entry1.getExecutionId());
176+
assertTrue(entry1.getSystemChange());
177+
178+
ScanResponse scanResponse = client.scan(
179+
ScanRequest.builder().tableName(DEFAULT_AUDIT_STORE_NAME).build()
180+
);
181+
assertTrue(scanResponse.count() > 0, "Audit table should not be empty");
182+
}
183+
184+
@Test
185+
@Disabled("restore when https://trello.com/c/4gEQ8Wb4/458-mongock-legacy-targetsystem done")
186+
void failIfEmptyOrigin() {
187+
Runner flamingock = FlamingockFactory.getCommunityBuilder()
188+
.setAuditStore(new DynamoDBAuditStore(client))
189+
.build();
190+
191+
Assertions.assertThrows(FlamingockException.class, flamingock::run);
192+
}
193+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright 2023 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.importer.mongock.dynamodb;
17+
18+
import io.flamingock.importer.dynamodb.MongockDynamoDBAuditEntry;
19+
import io.flamingock.importer.dynamodb.dynamodb.DynamoDBAuditEntryEntity;
20+
import io.flamingock.internal.common.core.audit.AuditEntry;
21+
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient;
22+
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable;
23+
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
24+
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
25+
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
26+
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
27+
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
28+
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
29+
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
30+
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
31+
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
32+
import software.amazon.awssdk.services.dynamodb.model.KeyType;
33+
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
34+
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
35+
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
36+
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
37+
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
38+
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
39+
40+
import java.util.ArrayList;
41+
import java.util.HashMap;
42+
import java.util.List;
43+
import java.util.Map;
44+
import java.util.stream.Collectors;
45+
46+
import static io.flamingock.internal.util.constants.CommunityPersistenceConstants.DEFAULT_AUDIT_STORE_NAME;
47+
48+
public class DynamoDBTestHelper {
49+
50+
private final DynamoDbClient client;
51+
private final String tableName;
52+
private final DynamoDbTable<?> table;
53+
54+
public DynamoDBTestHelper(DynamoDbClient client, String tableName) {
55+
this.client = client;
56+
this.tableName = tableName;
57+
DynamoDbEnhancedClient enhancedClient = DynamoDbEnhancedClient.builder()
58+
.dynamoDbClient(client)
59+
.build();
60+
61+
if (DEFAULT_AUDIT_STORE_NAME.equals(tableName)) {
62+
this.table = enhancedClient.table(tableName, TableSchema.fromBean(DynamoDBAuditEntryEntity.class));
63+
} else {
64+
this.table = enhancedClient.table(tableName, TableSchema.fromBean(MongockDynamoDBAuditEntry.class));
65+
}
66+
}
67+
68+
public void ensureTableExists() {
69+
ListTablesResponse tables = client.listTables();
70+
if (!tables.tableNames().contains(tableName)) {
71+
if (DEFAULT_AUDIT_STORE_NAME.equals(tableName)) {
72+
client.createTable(CreateTableRequest.builder()
73+
.tableName(tableName)
74+
.keySchema(
75+
KeySchemaElement.builder().attributeName("partitionKey").keyType(KeyType.HASH).build()
76+
)
77+
.attributeDefinitions(
78+
AttributeDefinition.builder().attributeName("partitionKey").attributeType(ScalarAttributeType.S).build()
79+
)
80+
.provisionedThroughput(
81+
ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build()
82+
)
83+
.build());
84+
} else {
85+
client.createTable(CreateTableRequest.builder()
86+
.tableName(tableName)
87+
.keySchema(
88+
KeySchemaElement.builder().attributeName("executionId").keyType(KeyType.HASH).build(),
89+
KeySchemaElement.builder().attributeName("changeId").keyType(KeyType.RANGE).build()
90+
)
91+
.attributeDefinitions(
92+
AttributeDefinition.builder().attributeName("executionId").attributeType(ScalarAttributeType.S).build(),
93+
AttributeDefinition.builder().attributeName("changeId").attributeType(ScalarAttributeType.S).build()
94+
)
95+
.provisionedThroughput(
96+
ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build()
97+
)
98+
.build());
99+
}
100+
waitForTableActive();
101+
}
102+
}
103+
104+
private void waitForTableActive() {
105+
while (true) {
106+
DescribeTableResponse resp = client.describeTable(DescribeTableRequest.builder().tableName(tableName).build());
107+
if (resp.table().tableStatus() == TableStatus.ACTIVE) {
108+
break;
109+
}
110+
try {
111+
Thread.sleep(200);
112+
} catch (InterruptedException ignored) {}
113+
}
114+
}
115+
116+
public void resetTable() {
117+
ScanRequest scanRequest = ScanRequest.builder().tableName(tableName).build();
118+
ScanResponse scanResponse = client.scan(scanRequest);
119+
for (Map<String, AttributeValue> item : scanResponse.items()) {
120+
Map<String, AttributeValue> key = new HashMap<>();
121+
if (DEFAULT_AUDIT_STORE_NAME.equals(tableName)) {
122+
key.put("partitionKey", item.get("partitionKey"));
123+
} else {
124+
key.put("executionId", item.get("executionId"));
125+
key.put("changeId", item.get("changeId"));
126+
}
127+
client.deleteItem(DeleteItemRequest.builder()
128+
.tableName(tableName)
129+
.key(key)
130+
.build());
131+
}
132+
}
133+
134+
public void insertChangeEntries(List<MongockDynamoDBAuditEntry> entries) {
135+
if (DEFAULT_AUDIT_STORE_NAME.equals(tableName)) {
136+
throw new UnsupportedOperationException("insertChangeEntries is only for change log tables");
137+
}
138+
DynamoDbTable<MongockDynamoDBAuditEntry> changeTable = (DynamoDbTable<MongockDynamoDBAuditEntry>) table;
139+
for (MongockDynamoDBAuditEntry entry : entries) {
140+
changeTable.putItem(entry);
141+
}
142+
}
143+
144+
public List<AuditEntry> getAuditEntriesSorted() {
145+
if (DEFAULT_AUDIT_STORE_NAME.equals(tableName)) {
146+
DynamoDbTable<DynamoDBAuditEntryEntity> auditTable = (DynamoDbTable<DynamoDBAuditEntryEntity>) table;
147+
List<DynamoDBAuditEntryEntity> entities = new ArrayList<>();
148+
auditTable.scan().items().forEach(entities::add);
149+
return entities.stream()
150+
.map(DynamoDBAuditEntryEntity::toAuditEntry)
151+
.sorted()
152+
.collect(Collectors.toList());
153+
} else {
154+
DynamoDbTable<MongockDynamoDBAuditEntry> changeTable = (DynamoDbTable<MongockDynamoDBAuditEntry>) table;
155+
List<MongockDynamoDBAuditEntry> entries = new ArrayList<>();
156+
changeTable.scan().items().forEach(entries::add);
157+
return entries.stream()
158+
.map(MongockDynamoDBAuditEntry::toAuditEntry)
159+
.sorted()
160+
.collect(Collectors.toList());
161+
}
162+
}
163+
}

0 commit comments

Comments
 (0)