@@ -149,8 +149,10 @@ type syncState struct {
149149}
150150
151151// NewPhysicalInitialJob creates a new physical initial job.
152- func NewPhysicalInitialJob (cfg config.JobConfig , global * global.Config , engineProps global.EngineProps , cloneManager pool.FSManager ,
153- tm * telemetry.Agent ) (* PhysicalInitial , error ) {
152+ func NewPhysicalInitialJob (
153+ cfg config.JobConfig , global * global.Config , engineProps global.EngineProps , cloneManager pool.FSManager ,
154+ tm * telemetry.Agent ,
155+ ) (* PhysicalInitial , error ) {
154156 p := & PhysicalInitial {
155157 name : cfg .Spec .Name ,
156158 cloneManager : cloneManager ,
@@ -397,7 +399,13 @@ func (p *PhysicalInitial) checkSyncInstance(ctx context.Context) (string, error)
397399
398400 log .Msg ("Sync instance has been checked. It is running" )
399401
400- if err := p .checkpoint (ctx , syncContainer .ID ); err != nil {
402+ if err := tools .RunCheckpoint (
403+ ctx ,
404+ p .dockerClient ,
405+ syncContainer .ID ,
406+ p .globalCfg .Database .User (),
407+ p .globalCfg .Database .Name (),
408+ ); err != nil {
401409 return "" , errors .Wrap (err , "failed to make a checkpoint for sync instance" )
402410 }
403411
@@ -616,9 +624,8 @@ func (p *PhysicalInitial) promoteInstance(ctx context.Context, clonePath string,
616624 }
617625 }
618626
619- // Checkpoint.
620- if err := p .checkpoint (ctx , containerID ); err != nil {
621- return err
627+ if err := tools .RunCheckpoint (ctx , p .dockerClient , containerID , p .globalCfg .Database .User (), p .globalCfg .Database .Name ()); err != nil {
628+ return errors .Wrap (err , "failed to run checkpoint" )
622629 }
623630
624631 if err := cfgManager .RemoveRecoveryConfig (); err != nil {
@@ -646,7 +653,10 @@ func (p *PhysicalInitial) promoteInstance(ctx context.Context, clonePath string,
646653 return nil
647654}
648655
649- func (p * PhysicalInitial ) getDSAFromWAL (ctx context.Context , pgVersion float64 , containerID , cloneDir string ) (string , error ) {
656+ func (p * PhysicalInitial ) getDSAFromWAL (ctx context.Context , pgVersion float64 , containerID , cloneDir string ) (
657+ string ,
658+ error ,
659+ ) {
650660 log .Dbg (cloneDir )
651661
652662 walDirectory := walDir (cloneDir , pgVersion )
@@ -692,7 +702,12 @@ func walDir(cloneDir string, pgVersion float64) string {
692702 return path .Join (cloneDir , dir )
693703}
694704
695- func (p * PhysicalInitial ) parseWAL (ctx context.Context , containerID string , pgVersion float64 , walFilePath string ) string {
705+ func (p * PhysicalInitial ) parseWAL (
706+ ctx context.Context ,
707+ containerID string ,
708+ pgVersion float64 ,
709+ walFilePath string ,
710+ ) string {
696711 cmd := walCommand (pgVersion , walFilePath )
697712
698713 output , err := tools .ExecCommandWithOutput (ctx , p .dockerClient , containerID , types.ExecConfig {
@@ -768,7 +783,11 @@ func buildRecoveryConfig(fileConfig, userRecoveryConfig map[string]string) map[s
768783 return recoveryConf
769784}
770785
771- func (p * PhysicalInitial ) markDSA (ctx context.Context , defaultDSA , containerID , dataDir string , pgVersion float64 ) error {
786+ func (p * PhysicalInitial ) markDSA (
787+ ctx context.Context ,
788+ defaultDSA , containerID , dataDir string ,
789+ pgVersion float64 ,
790+ ) error {
772791 extractedDataStateAt , err := p .extractDataStateAt (ctx , containerID , dataDir , pgVersion , defaultDSA )
773792 if err != nil {
774793 if defaultDSA == "" {
@@ -895,8 +914,10 @@ and the source doesn't have enough activity.
895914Step 3. Use the timestamp of the latest checkpoint. This is extracted from PGDATA using the
896915pg_controldata utility. Note that this is not an exact value of the latest activity in the source
897916before we took a copy of PGDATA, but we suppose it is not far from it. */
898- func (p * PhysicalInitial ) extractDataStateAt (ctx context.Context , containerID , dataDir string , pgVersion float64 ,
899- defaultDSA string ) (string , error ) {
917+ func (p * PhysicalInitial ) extractDataStateAt (
918+ ctx context.Context , containerID , dataDir string , pgVersion float64 ,
919+ defaultDSA string ,
920+ ) (string , error ) {
900921 output , err := p .getLastXActReplayTimestamp (ctx , containerID )
901922 if err != nil {
902923 log .Dbg ("unable to get last replay timestamp from the promotion container: " , err )
@@ -1002,20 +1023,6 @@ func (p *PhysicalInitial) runPromoteCommand(ctx context.Context, containerID, cl
10021023 return nil
10031024}
10041025
1005- func (p * PhysicalInitial ) checkpoint (ctx context.Context , containerID string ) error {
1006- commandCheckpoint := []string {"psql" , "-U" , p .globalCfg .Database .User (), "-d" , p .globalCfg .Database .Name (), "-XAtc" , "checkpoint" }
1007- log .Msg ("Run checkpoint command" , commandCheckpoint )
1008-
1009- output , err := tools .ExecCommandWithOutput (ctx , p .dockerClient , containerID , types.ExecConfig {Cmd : commandCheckpoint })
1010- if err != nil {
1011- return errors .Wrap (err , "failed to make checkpoint" )
1012- }
1013-
1014- log .Msg ("Checkpoint result: " , output )
1015-
1016- return nil
1017- }
1018-
10191026func (p * PhysicalInitial ) markDatabaseData () error {
10201027 if err := p .dbMarker .CreateConfig (); err != nil {
10211028 return errors .Wrap (err , "failed to create a DBMarker config of the database" )
0 commit comments