Skip to content

Commit f109545

Browse files
kbowers-jumpripatel-fd
authored andcommitted
vinyl: finish MOVE implementation
The main changes here is the MOVE support implementation is complete (including data compression). This includes test coverage for all MOVE functionality (which in turn also gives extra coverage of multiple keys to all request tests).
1 parent afeadbc commit f109545

File tree

4 files changed

+468
-256
lines changed

4 files changed

+468
-256
lines changed

src/vinyl/fd_vinyl_case_move.c

Lines changed: 76 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
/* FIXME: NEEDS COMPRESSION SUPPORT */
21
case FD_VINYL_REQ_TYPE_MOVE: {
32

43
fd_vinyl_key_t const * req_key_src = MAP_REQ_GADDR( req->key_gaddr, fd_vinyl_key_t, batch_cnt );
@@ -45,6 +44,8 @@
4544

4645
if( FD_UNLIKELY( line_idx_dst<line_cnt ) ) { /* dst is in cache */
4746

47+
FD_CRIT( line[ line_idx_dst ].ele_idx==ele_idx_dst, "corruption detected" );
48+
4849
ulong line_ctl_dst = line[ line_idx_dst ].ctl;
4950

5051
long ref_dst = fd_vinyl_line_ctl_ref( line_ctl_dst );
@@ -113,16 +114,61 @@
113114

114115
FD_CRIT( line_idx_src==ULONG_MAX, "corruption detected" );
115116

116-
ulong szc = fd_vinyl_data_szc( val_sz );
117+
/* Read the encoded pair from the bstream */
117118

118-
obj_src = fd_vinyl_data_alloc( data, szc );
119-
if( FD_UNLIKELY( !obj_src ) ) FD_LOG_CRIT(( "increase data cache size" ));
119+
ulong ctl = ele0[ ele_idx_src ].phdr.ctl;
120120

121-
phdr_src = fd_vinyl_data_obj_phdr( obj_src );
121+
int type = fd_vinyl_bstream_ctl_type ( ctl );
122+
int style = fd_vinyl_bstream_ctl_style( ctl );
123+
ulong val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
124+
125+
FD_CRIT( type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR, "corruption detected" );
126+
FD_CRIT( (style==FD_VINYL_BSTREAM_CTL_STYLE_RAW) | (style==FD_VINYL_BSTREAM_CTL_STYLE_LZ4), "corruption detected" );
127+
FD_CRIT( val_esz<=FD_VINYL_VAL_MAX, "corruption detected" );
122128

123-
fd_vinyl_io_read_imm( io, seq_src, phdr_src, val_sz );
129+
fd_vinyl_data_obj_t * cobj = fd_vinyl_data_alloc( data, fd_vinyl_data_szc( val_esz ) );
130+
if( FD_UNLIKELY( !cobj ) ) FD_LOG_CRIT(( "increase data cache size" ));
131+
132+
fd_vinyl_bstream_phdr_t * cphdr = fd_vinyl_data_obj_phdr( cobj );
133+
ulong cpair_sz = fd_vinyl_bstream_pair_sz( val_esz );
134+
135+
fd_vinyl_io_read_imm( io, seq_src, cphdr, cpair_sz );
124136
/* not an async read (so no read_cnt increment) */
125137

138+
/* Verify data integrity */
139+
140+
FD_ALERT( !fd_vinyl_bstream_pair_test( io_seed, seq_src, (fd_vinyl_bstream_block_t *)cphdr, cpair_sz ),
141+
"corruption detected" );
142+
143+
/* Decode the pair */
144+
145+
if( FD_LIKELY( style==FD_VINYL_BSTREAM_CTL_STYLE_RAW ) ) {
146+
147+
FD_CRIT( val_esz==val_sz, "corruption detected" );
148+
149+
obj_src = cobj;
150+
phdr_src = cphdr;
151+
152+
} else {
153+
154+
obj_src = fd_vinyl_data_alloc( data, fd_vinyl_data_szc( val_sz ) );
155+
if( FD_UNLIKELY( !obj_src ) ) FD_LOG_CRIT(( "increase data cache size" ));
156+
157+
char const * cval = (char const *)fd_vinyl_data_obj_val( cobj );
158+
char * val = (char *) fd_vinyl_data_obj_val( obj_src );
159+
if( FD_UNLIKELY( (ulong)LZ4_decompress_safe( cval, val, (int)val_esz, (int)val_sz )!=val_sz ) )
160+
FD_LOG_CRIT(( "LZ4_decompress_safe failed" ));
161+
162+
phdr_src = fd_vinyl_data_obj_phdr( obj_src );
163+
164+
phdr_src->ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
165+
phdr_src->key = cphdr->key;
166+
phdr_src->info = cphdr->info;
167+
168+
fd_vinyl_data_free( data, cobj );
169+
170+
}
171+
126172
line_idx_src = fd_vinyl_line_evict_lru( &vinyl->line_idx_lru, line, line_cnt, ele0, ele_max, data );
127173

128174
ulong line_ctl_src = line[ line_idx_src ].ctl;
@@ -135,6 +181,8 @@
135181

136182
fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx_src, FD_VINYL_LINE_EVICT_PRIO_LRU );
137183

184+
if( line_idx_src==line_idx_dst ) line_idx_dst = ULONG_MAX; /* Handle evict_lru evicting the dst */
185+
138186
}
139187

140188
/* At this point, pair key_src is cached but not acquired and pair
@@ -145,15 +193,18 @@
145193
location of pair key_src's meta element. So we reload if
146194
necessary. */
147195

148-
FD_ALERT( !memcmp( phdr_src, &ele0[ ele_idx_src ].phdr, sizeof(fd_vinyl_bstream_phdr_t) ), "corruption detected" );
196+
FD_CRIT( fd_vinyl_bstream_ctl_type( phdr_src->ctl )==fd_vinyl_bstream_ctl_type( ele0[ ele_idx_src ].phdr.ctl ),
197+
"corruption detected" );
198+
FD_CRIT( fd_vinyl_key_eq( &phdr_src->key, &ele0[ ele_idx_src ].phdr.key ), "corruption detected" );
199+
FD_CRIT( !memcmp( &phdr_src->info, &ele0[ ele_idx_src ].phdr.info, sizeof(fd_vinyl_info_t) ), "corruption detected" );
149200

150-
accum_garbage_cnt += 2UL;
201+
accum_garbage_cnt += 2UL; /* old src and new move block */
151202
accum_garbage_sz += fd_vinyl_bstream_pair_sz( fd_vinyl_bstream_ctl_sz( ele0[ ele_idx_src ].phdr.ctl ) ) +
152203
FD_VINYL_BSTREAM_BLOCK_SZ;
153204

154205
if( FD_UNLIKELY( !err_dst ) ) {
155206

156-
accum_garbage_cnt++;
207+
accum_garbage_cnt++; /* old dst */
157208
accum_garbage_sz += fd_vinyl_bstream_pair_sz( fd_vinyl_bstream_ctl_sz( ele0[ ele_idx_dst ].phdr.ctl ) );
158209

159210
if( FD_UNLIKELY( line_idx_dst < line_cnt ) ) {
@@ -169,27 +220,25 @@
169220

170221
ulong ver_dst = fd_vinyl_line_ctl_ver( line_ctl_dst );
171222

223+
fd_vinyl_data_free( data, obj_dst );
224+
172225
line[ line_idx_dst ].obj = NULL;
173-
line[ line_idx_dst ].ele_idx = ULONG_MAX;
226+
line[ line_idx_dst ].ele_idx = ULONG_MAX; // ele0[ ele_idx_dst ].line_idx = ULONG_MAX; /* Technically not necessary given below */
174227
line[ line_idx_dst ].ctl = fd_vinyl_line_ctl( ver_dst+1UL, 0L );
175228

176229
fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx_dst, FD_VINYL_LINE_EVICT_PRIO_LRU );
230+
}
177231

178-
fd_vinyl_data_free( data, obj_dst );
179-
180-
ulong pair_cnt = vinyl->pair_cnt;
181-
FD_CRIT( pair_cnt, "corruption detected" );
182-
vinyl->pair_cnt = pair_cnt - 1UL;
183-
184-
fd_vinyl_meta_remove_fast( ele0, ele_max, lock, lock_shift, line, line_cnt, ele_idx_dst ); /* See note below about atomicity for concurrent meta readers */
185-
186-
err_src = fd_vinyl_meta_query_fast( ele0, ele_max, key_src, memo_src, &_ele_idx_src );
187-
ele_idx_src = _ele_idx_src; /* In [0,ele_max) */
232+
fd_vinyl_meta_remove_fast( ele0, ele_max, lock, lock_shift, line, line_cnt, ele_idx_dst ); /* See note below about atomicity for concurrent meta readers */
188233

189-
FD_CRIT( !err_src, "corruption detected" );
190-
/* Note: could test other fields post move too */
234+
ulong pair_cnt = vinyl->pair_cnt;
235+
FD_CRIT( pair_cnt, "corruption detected" );
236+
vinyl->pair_cnt = pair_cnt - 1UL;
191237

192-
}
238+
err_src = fd_vinyl_meta_query_fast( ele0, ele_max, key_src, memo_src, &_ele_idx_src );
239+
ele_idx_src = _ele_idx_src; /* In [0,ele_max) */
240+
FD_CRIT( !err_src, "corruption detected" );
241+
/* Note: could test other fields post move too */
193242

194243
}
195244

@@ -252,6 +301,8 @@
252301
ele0[ ele_idx_dst ].phdr.ctl = phdr_src->ctl;
253302
FD_COMPILER_MFENCE();
254303

304+
line[ line_idx_src ].ele_idx = ele_idx_dst;
305+
255306
fd_vinyl_io_append_move( io, phdr_src, key_dst, NULL, 0UL );
256307
append_cnt++;
257308
accum_move_cnt++;
@@ -262,6 +313,8 @@
262313
append_cnt++;
263314
FD_CRIT( fd_vinyl_seq_eq( seq, seq_dst ), "unexpected append location" );
264315

316+
DONE( FD_VINYL_SUCCESS );
317+
265318
next_move: /* silly language restriction */;
266319

267320
# undef DONE

src/vinyl/fd_vinyl_exec.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ fd_vinyl_exec( fd_vinyl_t * vinyl ) {
611611
ulong cval_sz = fd_vinyl_bstream_ctl_sz( cpair_ctl );
612612

613613
ulong _val_sz = (ulong)LZ4_decompress_safe( cval, val, (int)cval_sz, (int)val_sz );
614-
if( FD_UNLIKELY( _val_sz!=val_sz ) ) FD_LOG_CRIT(( "LZ4_decompres_safe failed" ));
614+
if( FD_UNLIKELY( _val_sz!=val_sz ) ) FD_LOG_CRIT(( "LZ4_decompress_safe failed" ));
615615

616616
fd_vinyl_data_free( data, cobj );
617617

src/vinyl/fd_vinyl_recover.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -684,11 +684,13 @@ t1 = t0 + 1UL; /* Turn off parallel recovery while it is untested */
684684
seq_present can be stored in pair_max. We retry with a serial
685685
recovery if parallel recovery fails. */
686686

687+
ulong seq = fd_vinyl_io_seq_present( vinyl->io );
688+
687689
ulong rtmp[4];
688690
ulong lock[2];
689691

690692
lock[0] = 0UL;
691-
lock[1] = fd_vinyl_io_seq_present( vinyl->io );
693+
lock[1] = seq;
692694

693695
FD_MAP_REDUCE( fd_vinyl_recover_part_task, tpool,t0,t1, 0L,(long)(t1-t0), rtmp, vinyl, lock );
694696

@@ -723,7 +725,7 @@ t1 = t0 + 1UL; /* Turn off parallel recovery while it is untested */
723725

724726
fd_vinyl_data_reset( tpool,t0,t1, level, vinyl->data );
725727

726-
return 0UL;
728+
return seq;
727729

728730
# endif /* FD_HAS_ATOMIC */
729731
}

0 commit comments

Comments
 (0)