22
33import com .fasterxml .jackson .core .type .TypeReference ;
44import com .scalar .db .api .Delete ;
5- import com .scalar .db .api .DeleteIf ;
6- import com .scalar .db .api .DeleteIfExists ;
75import com .scalar .db .api .Mutation ;
86import com .scalar .db .api .Put ;
9- import com .scalar .db .api .PutIf ;
10- import com .scalar .db .api .PutIfExists ;
11- import com .scalar .db .api .PutIfNotExists ;
127import com .scalar .db .api .TableMetadata ;
138import com .scalar .db .common .CoreError ;
149import com .scalar .db .common .TableMetadataManager ;
1510import com .scalar .db .exception .storage .ExecutionException ;
16- import com .scalar .db .exception .storage .NoMutationException ;
1711import com .scalar .db .exception .storage .RetriableExecutionException ;
1812import java .util .ArrayList ;
1913import java .util .Collections ;
@@ -66,94 +60,19 @@ private void mutate(
6660 String namespaceName , String tableName , String partitionKey , List <Mutation > mutations )
6761 throws ExecutionException {
6862 Map <PartitionIdentifier , String > readVersionMap = new HashMap <>();
69- Map < String , ObjectStorageRecord > partition =
63+ ObjectStoragePartition partition =
7064 getPartition (namespaceName , tableName , partitionKey , readVersionMap );
7165 for (Mutation mutation : mutations ) {
7266 if (mutation instanceof Put ) {
73- putInternal ( partition , ( Put ) mutation );
67+ partition . applyPut (( Put ) mutation , metadataManager . getTableMetadata ( mutation ) );
7468 } else {
7569 assert mutation instanceof Delete ;
76- deleteInternal ( partition , ( Delete ) mutation );
70+ partition . applyDelete (( Delete ) mutation , metadataManager . getTableMetadata ( mutation ) );
7771 }
7872 }
7973 applyPartitionWrite (namespaceName , tableName , partitionKey , partition , readVersionMap );
8074 }
8175
82- private void putInternal (Map <String , ObjectStorageRecord > partition , Put put )
83- throws ExecutionException {
84- TableMetadata tableMetadata = metadataManager .getTableMetadata (put );
85- ObjectStorageMutation mutation = new ObjectStorageMutation (put , tableMetadata );
86- if (!put .getCondition ().isPresent ()) {
87- ObjectStorageRecord existingRecord = partition .get (mutation .getRecordId ());
88- if (existingRecord == null ) {
89- partition .put (mutation .getRecordId (), mutation .makeRecord ());
90- } else {
91- partition .put (mutation .getRecordId (), mutation .makeRecord (existingRecord ));
92- }
93- } else if (put .getCondition ().get () instanceof PutIfNotExists ) {
94- if (partition .containsKey (mutation .getRecordId ())) {
95- throw new NoMutationException (
96- CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (put ));
97- }
98- partition .put (mutation .getRecordId (), mutation .makeRecord ());
99- } else if (put .getCondition ().get () instanceof PutIfExists ) {
100- ObjectStorageRecord existingRecord = partition .get (mutation .getRecordId ());
101- if (existingRecord == null ) {
102- throw new NoMutationException (
103- CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (put ));
104- }
105- partition .put (mutation .getRecordId (), mutation .makeRecord (existingRecord ));
106- } else {
107- assert put .getCondition ().get () instanceof PutIf ;
108- ObjectStorageRecord existingRecord = partition .get (mutation .getRecordId ());
109- if (existingRecord == null ) {
110- throw new NoMutationException (
111- CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (put ));
112- }
113- try {
114- validateConditions (
115- partition .get (mutation .getRecordId ()),
116- put .getCondition ().get ().getExpressions (),
117- metadataManager .getTableMetadata (mutation .getOperation ()));
118- } catch (ExecutionException e ) {
119- throw new NoMutationException (
120- CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (put ), e );
121- }
122- partition .put (mutation .getRecordId (), mutation .makeRecord (existingRecord ));
123- }
124- }
125-
126- private void deleteInternal (Map <String , ObjectStorageRecord > partition , Delete delete )
127- throws ExecutionException {
128- TableMetadata tableMetadata = metadataManager .getTableMetadata (delete );
129- ObjectStorageMutation mutation = new ObjectStorageMutation (delete , tableMetadata );
130- if (!delete .getCondition ().isPresent ()) {
131- partition .remove (mutation .getRecordId ());
132- } else if (delete .getCondition ().get () instanceof DeleteIfExists ) {
133- if (!partition .containsKey (mutation .getRecordId ())) {
134- throw new NoMutationException (
135- CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (delete ));
136- }
137- partition .remove (mutation .getRecordId ());
138- } else {
139- assert delete .getCondition ().get () instanceof DeleteIf ;
140- if (!partition .containsKey (mutation .getRecordId ())) {
141- throw new NoMutationException (
142- CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (delete ));
143- }
144- try {
145- validateConditions (
146- partition .get (mutation .getRecordId ()),
147- delete .getCondition ().get ().getExpressions (),
148- metadataManager .getTableMetadata (mutation .getOperation ()));
149- } catch (ExecutionException e ) {
150- throw new NoMutationException (
151- CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (delete ), e );
152- }
153- partition .remove (mutation .getRecordId ());
154- }
155- }
156-
15776 /**
15877 * Applies the partition write.
15978 *
@@ -168,13 +87,11 @@ private void applyPartitionWrite(
16887 String namespaceName ,
16988 String tableName ,
17089 String partitionKey ,
171- Map < String , ObjectStorageRecord > partition ,
90+ ObjectStoragePartition partition ,
17291 Map <PartitionIdentifier , String > readVersionMap )
17392 throws ExecutionException {
174- if (readVersionMap .containsKey (
175- PartitionIdentifier .of (namespaceName , tableName , partitionKey ))) {
176- String readVersion =
177- readVersionMap .get (PartitionIdentifier .of (namespaceName , tableName , partitionKey ));
93+ if (readVersionMap .containsKey (partition .getPartitionIdentifier ())) {
94+ String readVersion = readVersionMap .get (partition .getPartitionIdentifier ());
17895 if (!partition .isEmpty ()) {
17996 updatePartition (namespaceName , tableName , partitionKey , partition , readVersion );
18097 } else {
@@ -197,7 +114,7 @@ private void applyPartitionWrite(
197114 * @return the partition
198115 * @throws ExecutionException if a failure occurs during the operation
199116 */
200- private Map < String , ObjectStorageRecord > getPartition (
117+ private ObjectStoragePartition getPartition (
201118 String namespaceName ,
202119 String tableName ,
203120 String partitionKey ,
@@ -207,13 +124,17 @@ private Map<String, ObjectStorageRecord> getPartition(
207124 try {
208125 Optional <ObjectStorageWrapperResponse > response = wrapper .get (objectKey );
209126 if (!response .isPresent ()) {
210- return new HashMap <>();
127+ return ObjectStoragePartition .newBuilder ()
128+ .namespaceName (namespaceName )
129+ .tableName (tableName )
130+ .partitionKey (partitionKey )
131+ .build ();
211132 }
212- readVersionMap . put (
213- PartitionIdentifier . of ( namespaceName , tableName , partitionKey ),
214- response .get ().getVersion () );
215- return Serializer . deserialize (
216- response . get (). getPayload (), new TypeReference < Map < String , ObjectStorageRecord >>() {}) ;
133+ ObjectStoragePartition partition =
134+ Serializer . deserialize (
135+ response .get ().getPayload (), new TypeReference < ObjectStoragePartition >() {} );
136+ readVersionMap . put ( partition . getPartitionIdentifier (), response . get (). getVersion ());
137+ return partition ;
217138 } catch (ObjectStorageWrapperException e ) {
218139 throw new ExecutionException (
219140 CoreError .OBJECT_STORAGE_ERROR_OCCURRED_IN_MUTATION .buildMessage (e .getMessage ()), e );
@@ -231,10 +152,7 @@ private Map<String, ObjectStorageRecord> getPartition(
231152 * @throws ExecutionException if a failure occurs during the operation
232153 */
233154 private void insertPartition (
234- String namespaceName ,
235- String tableName ,
236- String partitionKey ,
237- Map <String , ObjectStorageRecord > partition )
155+ String namespaceName , String tableName , String partitionKey , ObjectStoragePartition partition )
238156 throws ExecutionException {
239157 try {
240158 wrapper .insert (
@@ -264,7 +182,7 @@ private void updatePartition(
264182 String namespaceName ,
265183 String tableName ,
266184 String partitionKey ,
267- Map < String , ObjectStorageRecord > partition ,
185+ ObjectStoragePartition partition ,
268186 String readVersion )
269187 throws ExecutionException {
270188 try {
0 commit comments