From 5b218e0152498551dd5a8e96acc0b32ebc6fa39a Mon Sep 17 00:00:00 2001 From: Kirill Knyazkov Date: Wed, 17 Sep 2025 16:38:48 +0700 Subject: [PATCH 1/2] Add compression of ptrack.map during backup. Compression algorithm is taken from backup state (pgBackup current). Compression itself processed using internal libraries. Compute CRC using pgFileGetCRCgz, because of compression. Add decompression of ptrack.map during restore. --- src/data.c | 45 +++++++++++++++++++++++++ src/pg_probackup.h | 5 +++ src/util.c | 84 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+) diff --git a/src/data.c b/src/data.c index d88b06b6..807a9484 100644 --- a/src/data.c +++ b/src/data.c @@ -794,6 +794,14 @@ backup_non_data_file(pgFile *file, pgFile *prev_file, return; } + /* special treatment for global/ptrack.map */ + if (strcmp(file->name, "ptrack.map") == 0) + { + copy_ptrackmap_file(from_fullpath, FIO_DB_HOST, + to_fullpath, FIO_BACKUP_HOST, file); + return; + } + /* * If non-data file exists in previous backup * and its mtime is less than parent backup start time ... */ @@ -1383,6 +1391,43 @@ restore_non_data_file(parray *parent_chain, pgBackup *dest_backup, elog(ERROR, "Cannot close file \"%s\": %s", from_fullpath, strerror(errno)); + + /* We have to decompress ptrack.map */ + if (tmp_backup->compress_alg > NONE_COMPRESS && strcmp(tmp_file->name, "ptrack.map") == 0){ + + /* do decompression */ + char *buffer; + size_t size; + + const char *errormsg = NULL; + buffer = slurpFile(to_fullpath, "", &size, false, FIO_DB_HOST); // not sure about to_location + + size_t decompressed_size = tmp_file->size * 2; + void* decompressed = pg_malloc(decompressed_size); + + int rc = do_decompress(decompressed, decompressed_size, buffer, size, tmp_backup->compress_alg, &errormsg); + + /* Something went wrong and errormsg was assigned, throw a warning */ + if (rc < 0 && errormsg != NULL) + elog(WARNING, "An error occured during compressing ptrack.map: %s", errormsg); + + /* decompression didn't worked */ + if (rc <= 0) + { + elog(ERROR, "An error occured during decompression of ptrack.map: %s", errormsg); + pg_free(buffer); + pg_free(decompressed); + } + else { + decompressed_size = rc; + } + + writePtrackMap(decompressed, decompressed_size, to_fullpath, FIO_DB_HOST); + + pg_free(decompressed); + pg_free(buffer); + } + return tmp_file->write_size; } diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 175aa558..9d2263de 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -1214,6 +1214,8 @@ extern void get_control_file_or_back_file(const char *pgdata_path, fio_location ControlFileData *control); extern void copy_pgcontrol_file(const char *from_fullpath, fio_location from_location, const char *to_fullpath, fio_location to_location, pgFile *file); +extern void copy_ptrackmap_file(const char *from_fullpath, fio_location from_location, + const char *to_fullpath, fio_location to_location, pgFile *file); extern void time2iso(char *buf, size_t len, time_t time, bool utc); extern const char *status2str(BackupStatus status); @@ -1239,6 +1241,9 @@ extern PGconn *pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeIn extern void check_system_identifiers(PGconn *conn, const char *pgdata); extern void parse_filelist_filenames(parray *files, const char *root); +extern void writePtrackMap(const char *ptrackMap, const size_t ptrackmap_size, + const char *path, fio_location location); + /* in ptrack.c */ extern void make_pagemap_from_ptrack_2(parray* files, PGconn* backup_conn, const char *ptrack_schema, diff --git a/src/util.c b/src/util.c index 019d2064..dcbb33b8 100644 --- a/src/util.c +++ b/src/util.c @@ -454,6 +454,90 @@ copy_pgcontrol_file(const char *from_fullpath, fio_location from_location, pg_free(buffer); } +/* + * Write page_map_entry to ptrackMap + */ +void +writePtrackMap(const char *ptrackMap, const size_t ptrackmap_size, + const char *path, fio_location location) +{ + int fd; + char *buffer = NULL; + + // checks? + + /* copy ptrackMap */ + buffer = pg_malloc0(ptrackmap_size); + memcpy(buffer, ptrackMap, ptrackmap_size); + + /* Write ptrackMap */ + fd = fio_open(path, + O_RDWR | O_CREAT | O_TRUNC | PG_BINARY, location); + + if (fd < 0) { + elog(ERROR, "Failed to open file: %s", path); + } + + if (fio_write(fd, buffer, ptrackmap_size) != ptrackmap_size) { + elog(ERROR, "Failed to overwrite file: %s", path); + } + + if (fio_flush(fd) != 0) { + elog(ERROR, "Failed to sync file: %s", path); + } + + fio_close(fd); + pg_free(buffer); +} + +/* +* Copy ptrack.map file to backup. We do apply compression to this file. +*/ +void copy_ptrackmap_file(const char *from_fullpath, fio_location from_location, + const char *to_fullpath, fio_location to_location, + pgFile *file) { + char *buffer; + size_t size; + + bool missing_ok = true; + bool use_crc32c = true; + + const char *errormsg = NULL; + + buffer = slurpFile(from_fullpath, "", &size, false, from_location); + + size_t compressed_size = size; + void *compressed = pg_malloc(compressed_size); + + int rc = do_compress(compressed, compressed_size, buffer, size, + current.compress_alg, current.compress_level, &errormsg); + + /* Something went wrong and errormsg was assigned, throw a warning */ + if (rc < 0 && errormsg != NULL) + elog(WARNING, "An error occured during compressing ptrack.map: %s", + errormsg); + + /* compression didn`t worked */ + if (rc <= 0 || rc >= size) { + /* Do not compress ptrack.map */ + memcpy(compressed, buffer, size); + } else { + compressed_size = rc; + } + + writePtrackMap(compressed, compressed_size, to_fullpath, to_location); + + file->crc = pgFileGetCRCgz(to_fullpath, use_crc32c, missing_ok);; + file->size = compressed_size; + file->read_size = size; + file->write_size = compressed_size; + file->uncompressed_size = size; + file->compress_alg = current.compress_alg; + + pg_free(compressed); + pg_free(buffer); +} + /* * Parse string representation of the server version. */ From 806167a6f0ad76b12f4e2a9fc127108e919b25c9 Mon Sep 17 00:00:00 2001 From: Kirill Knyazkov Date: Thu, 9 Oct 2025 14:12:20 +0700 Subject: [PATCH 2/2] add: atomic ptrack.map write, CRC calculation only for decompressed ptrack.map, cleanups --- src/data.c | 36 +++++++++++++++++------------ src/util.c | 67 ++++++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 78 insertions(+), 25 deletions(-) diff --git a/src/data.c b/src/data.c index 807a9484..f7d1243c 100644 --- a/src/data.c +++ b/src/data.c @@ -1393,37 +1393,43 @@ restore_non_data_file(parray *parent_chain, pgBackup *dest_backup, /* We have to decompress ptrack.map */ - if (tmp_backup->compress_alg > NONE_COMPRESS && strcmp(tmp_file->name, "ptrack.map") == 0){ - + if (tmp_backup->compress_alg > NONE_COMPRESS && strcmp(tmp_file->name, "ptrack.map") == 0) + { /* do decompression */ char *buffer; size_t size; const char *errormsg = NULL; - buffer = slurpFile(to_fullpath, "", &size, false, FIO_DB_HOST); // not sure about to_location + buffer = slurpFile(to_fullpath, "", &size, false, FIO_DB_HOST); + + if (buffer == NULL) + elog(ERROR, "Failed to allocate buffer during ptrack.map decompression"); size_t decompressed_size = tmp_file->size * 2; void* decompressed = pg_malloc(decompressed_size); - int rc = do_decompress(decompressed, decompressed_size, buffer, size, tmp_backup->compress_alg, &errormsg); + if (decompressed == NULL) + { + pg_free(buffer); + elog(ERROR, "Failed to allocate decompressed buffer during ptrack.map decompression"); + } - /* Something went wrong and errormsg was assigned, throw a warning */ - if (rc < 0 && errormsg != NULL) - elog(WARNING, "An error occured during compressing ptrack.map: %s", errormsg); + int rc = do_decompress(decompressed, decompressed_size, buffer, size, tmp_backup->compress_alg, &errormsg); /* decompression didn't worked */ - if (rc <= 0) - { - elog(ERROR, "An error occured during decompression of ptrack.map: %s", errormsg); - pg_free(buffer); - pg_free(decompressed); - } - else { + if (rc <= 0 || errormsg != NULL) + elog(WARNING, "An error occurred during decompression of ptrack.map: %s", errormsg); + else decompressed_size = rc; - } writePtrackMap(decompressed, decompressed_size, to_fullpath, FIO_DB_HOST); + /* Check CRC for decompressed data */ + pg_crc32 file_crc = pgFileGetCRC(to_fullpath, true, true); + + if (file_crc != tmp_file->crc) + elog(WARNING, "CRC mismatch for uncompressed ptrack.map during decompression"); + pg_free(decompressed); pg_free(buffer); } diff --git a/src/util.c b/src/util.c index dcbb33b8..1641725f 100644 --- a/src/util.c +++ b/src/util.c @@ -463,31 +463,58 @@ writePtrackMap(const char *ptrackMap, const size_t ptrackmap_size, { int fd; char *buffer = NULL; - - // checks? + char *tmp_path = NULL; + + tmp_path = psprintf("%s.tmp",path); /* copy ptrackMap */ buffer = pg_malloc0(ptrackmap_size); + + if (buffer == NULL) + { + pfree(tmp_path); + elog(ERROR, "Failed to allocate buffer during ptrack.map write"); + } + memcpy(buffer, ptrackMap, ptrackmap_size); /* Write ptrackMap */ - fd = fio_open(path, + fd = fio_open(tmp_path, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY, location); if (fd < 0) { - elog(ERROR, "Failed to open file: %s", path); + pfree(tmp_path); + pg_free(buffer); + elog(ERROR, "Failed to open temp file: %s", tmp_path); } if (fio_write(fd, buffer, ptrackmap_size) != ptrackmap_size) { - elog(ERROR, "Failed to overwrite file: %s", path); + fio_close(fd); + fio_unlink(tmp_path, location); + pfree(tmp_path); + pg_free(buffer); + elog(ERROR, "Failed to write temp file: %s", tmp_path); } if (fio_flush(fd) != 0) { - elog(ERROR, "Failed to sync file: %s", path); + fio_close(fd); + fio_unlink(tmp_path, location); + pg_free(buffer); + pfree(tmp_path); + elog(ERROR, "Failed to sync temp file: %s", path); } fio_close(fd); + + if (fio_rename(tmp_path, path, location) != 0){ + fio_unlink(tmp_path, location); + pg_free(buffer); + pfree(tmp_path); + elog(ERROR, "Failed to rename temp file to: %s", path); + } + pg_free(buffer); + pfree(tmp_path); } /* @@ -506,33 +533,53 @@ void copy_ptrackmap_file(const char *from_fullpath, fio_location from_location, buffer = slurpFile(from_fullpath, "", &size, false, from_location); - size_t compressed_size = size; + if (buffer == NULL) + elog(ERROR, "Failed to allocate buffer during ptrack.map copy"); + + /* Calculate CRC of uncompressed map first */ + file->crc = pgFileGetCRC(from_fullpath, use_crc32c, missing_ok); + + size_t compressed_size = (current.compress_alg == ZLIB_COMPRESS)? compressBound(size) : size; void *compressed = pg_malloc(compressed_size); + if (compressed == NULL) + { + pg_free(buffer); + elog(ERROR, "Failed to allocate compressed buffer during ptrack.map copy"); + } + int rc = do_compress(compressed, compressed_size, buffer, size, current.compress_alg, current.compress_level, &errormsg); /* Something went wrong and errormsg was assigned, throw a warning */ if (rc < 0 && errormsg != NULL) - elog(WARNING, "An error occured during compressing ptrack.map: %s", + elog(WARNING, "An error occurred during compression of ptrack.map: %s", errormsg); + bool is_compressed = false; + /* compression didn`t worked */ if (rc <= 0 || rc >= size) { /* Do not compress ptrack.map */ memcpy(compressed, buffer, size); } else { + is_compressed = true; compressed_size = rc; } writePtrackMap(compressed, compressed_size, to_fullpath, to_location); - file->crc = pgFileGetCRCgz(to_fullpath, use_crc32c, missing_ok);; file->size = compressed_size; file->read_size = size; file->write_size = compressed_size; file->uncompressed_size = size; - file->compress_alg = current.compress_alg; + + if (is_compressed){ + file->compress_alg = current.compress_alg; + } + else { + file->compress_alg = NONE_COMPRESS; + } pg_free(compressed); pg_free(buffer);