|
| 1 | +import os |
| 2 | +import logging |
| 3 | + |
| 4 | +from select import select |
| 5 | +from subprocess import Popen, PIPE, call |
| 6 | + |
| 7 | +from mongodb_consistent_backup.Errors import OperationError |
| 8 | +from mongodb_consistent_backup.Pipeline import Task |
| 9 | + |
| 10 | + |
| 11 | +class Zbackup(Task): |
| 12 | + def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs): |
| 13 | + super(Zbackup, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs) |
| 14 | + self.backup_name = self.config.backup.name |
| 15 | + self.backup_time = os.path.basename(self.backup_dir) |
| 16 | + self.zbackup_binary = self.config.archive.zbackup.binary |
| 17 | + self.zbackup_cache_mb = self.config.archive.zbackup.cache_mb |
| 18 | + self.zbackup_passwd_file = self.config.archive.zbackup.password_file |
| 19 | + |
| 20 | + if self.config.archive.zbackup.threads and self.config.archive.zbackup.threads > 0: |
| 21 | + self.threads(self.config.archive.zbackup.threads) |
| 22 | + |
| 23 | + # only lzma compression supported (for now) |
| 24 | + self.compression_method = 'lzma' |
| 25 | + self.compression_supported = ['lzma'] |
| 26 | + |
| 27 | + self.zbackup_dir = os.path.join(self.config.backup.location, self.backup_name, "mongodb-consistent-backup_zbackup") |
| 28 | + self.zbackup_backups = os.path.join(self.zbackup_dir, "backups") |
| 29 | + self.zbackup_backup_path = os.path.join(self.zbackup_backups, "%s.tar" % self.backup_time) |
| 30 | + self.zbackup_bundles = os.path.join(self.zbackup_dir, "bundles") |
| 31 | + self.zbackup_info = os.path.join(self.zbackup_dir, "info") |
| 32 | + self.backup_meta_dir = "mongodb-consistent-backup_META" |
| 33 | + |
| 34 | + self.encrypted = False |
| 35 | + self._zbackup = None |
| 36 | + self._tar = None |
| 37 | + self._version = None |
| 38 | + |
| 39 | + self.init() |
| 40 | + |
| 41 | + def is_zbackup_init(self): |
| 42 | + if os.path.isfile(self.zbackup_info) and os.path.isdir(self.zbackup_backups) and os.path.isdir(self.zbackup_bundles): |
| 43 | + return True |
| 44 | + return False |
| 45 | + |
| 46 | + def init(self): |
| 47 | + if os.path.isdir(self.zbackup_dir): |
| 48 | + if self.is_zbackup_init(): |
| 49 | + logging.info("Found existing ZBackup storage dir at: %s (encrypted: %s)" % (self.zbackup_dir, self.encrypted)) |
| 50 | + else: |
| 51 | + raise OperationError("ZBackup dir: %s is not a zbackup storage directory!" % self.zbackup_dir) |
| 52 | + else: |
| 53 | + try: |
| 54 | + cmd_line = [self.zbackup_binary] |
| 55 | + if self.zbackup_passwd_file: |
| 56 | + cmd_line.extend(["--password-file", self.zbackup_passwd_file, "init", self.zbackup_dir]) |
| 57 | + logging.info("Using ZBackup AES encryption with password file: %s" % self.zbackup_passwd_file) |
| 58 | + self.encrypted = True |
| 59 | + else: |
| 60 | + cmd_line.extend(["--non-encrypted", "init", self.zbackup_dir]) |
| 61 | + logging.warning("Initializing new ZBackup storage directory at: %s (encrypted: %s)" % (self.zbackup_dir, self.encrypted)) |
| 62 | + logging.debug("Using ZBackup command: '%s'" % cmd_line) |
| 63 | + exit_code = call(cmd_line) |
| 64 | + if exit_code != 0: |
| 65 | + raise OperationError("ZBackup initialization failed! Exit code: %i" % exit_code) |
| 66 | + except Exception, e: |
| 67 | + raise OperationError("Error creating ZBackup storage directory! Error: %s" % e) |
| 68 | + |
| 69 | + def version(self): |
| 70 | + if self._version: |
| 71 | + return self._version |
| 72 | + else: |
| 73 | + try: |
| 74 | + cmd = Popen([self.zbackup_binary, "--help"], stderr=PIPE) |
| 75 | + stdout, stderr = cmd.communicate() |
| 76 | + if stderr: |
| 77 | + line = stderr.split("\n")[0] |
| 78 | + if line.startswith("ZBackup") and "version " in line: |
| 79 | + fields = line.split(" ") |
| 80 | + version = fields[len(fields) - 1] |
| 81 | + if len(version.split(".")) == 3: |
| 82 | + self._version = version |
| 83 | + return self._version |
| 84 | + return None |
| 85 | + except OSError, e: |
| 86 | + return None |
| 87 | + except Exception, e: |
| 88 | + raise OperationError("Could not gather ZBackup version: %s" % e) |
| 89 | + |
| 90 | + def has_zbackup(self): |
| 91 | + if self.version(): |
| 92 | + return True |
| 93 | + return False |
| 94 | + |
| 95 | + def close(self, exit_code=None, frame=None): |
| 96 | + del exit_code |
| 97 | + del frame |
| 98 | + if not self.stopped: |
| 99 | + if self._zbackup and self._zbackup.poll() == None: |
| 100 | + logging.debug("Stopping running ZBackup command") |
| 101 | + self._zbackup.terminate() |
| 102 | + if self._tar and self._tar.poll() == None: |
| 103 | + logging.debug("Stopping running ZBackup tar command") |
| 104 | + self._tar.terminate() |
| 105 | + self.stopped = True |
| 106 | + |
| 107 | + def poll(self, timeout=1): |
| 108 | + try: |
| 109 | + poll = select([self._zbackup.stderr.fileno()], [], [], timeout) |
| 110 | + except ValueError: |
| 111 | + return |
| 112 | + if len(poll) >= 1: |
| 113 | + for fd in poll[0]: |
| 114 | + line = self._zbackup.stderr.readline() |
| 115 | + if line: |
| 116 | + logging.info(line.rstrip()) |
| 117 | + |
| 118 | + def wait(self): |
| 119 | + try: |
| 120 | + tar_done = False |
| 121 | + while self._zbackup.stderr and self._tar.stderr: |
| 122 | + self.poll() |
| 123 | + if tar_done: |
| 124 | + self._zbackup.communicate() |
| 125 | + if self._zbackup.poll() != None: |
| 126 | + logging.info("ZBackup completed successfully with exit code: %i" % self._zbackup.returncode) |
| 127 | + if self._zbackup.returncode != 0: |
| 128 | + raise OperationError("ZBackup exited with code: %i!" % self._zbackup.returncode) |
| 129 | + break |
| 130 | + elif self._tar.poll() != None: |
| 131 | + if self._tar.returncode == 0: |
| 132 | + logging.debug("ZBackup tar command completed successfully with exit code: %i" % self._tar.returncode) |
| 133 | + tar_done = True |
| 134 | + else: |
| 135 | + raise OperationError("ZBackup archiving failed on tar command with exit code: %i" % self._tar.returncode) |
| 136 | + except Exception, e: |
| 137 | + raise OperationError("Error reading ZBackup output: %s" % e) |
| 138 | + |
| 139 | + def get_commands(self, base_dir, sub_dir): |
| 140 | + tar = ["tar", "--remove-files", "-C", base_dir, "-c", sub_dir] |
| 141 | + zbackup = [self.zbackup_binary, "--cache-size", "%imb" % self.zbackup_cache_mb, "--compression", self.compression()] |
| 142 | + zbackup_path = os.path.join(self.zbackup_backups, "%s.%s.tar" % (self.backup_time, sub_dir)) |
| 143 | + if self.encrypted: |
| 144 | + zbackup.extend(["--password-file", self.zbackup_passwd_file, "backup", zbackup_path]) |
| 145 | + else: |
| 146 | + zbackup.extend(["--non-encrypted", "backup", zbackup_path]) |
| 147 | + return tar, zbackup |
| 148 | + |
| 149 | + def run(self): |
| 150 | + if self.has_zbackup(): |
| 151 | + try: |
| 152 | + logging.info("Starting ZBackup version: %s (options: compression=%s, encryption=%s, threads=%i, cache_mb=%i)" % |
| 153 | + (self.version(), self.compression(), self.encrypted, self.threads(), self.zbackup_cache_mb) |
| 154 | + ) |
| 155 | + self.running = True |
| 156 | + try: |
| 157 | + for sub_dir in os.listdir(self.backup_dir): |
| 158 | + if sub_dir == self.backup_meta_dir: |
| 159 | + continue |
| 160 | + logging.info("Running ZBackup for path: %s" % os.path.join(self.backup_dir, sub_dir)) |
| 161 | + tar_cmd, zbkp_cmd = self.get_commands(self.backup_dir, sub_dir) |
| 162 | + logging.debug("Running ZBackup tar command: %s" % tar_cmd) |
| 163 | + logging.debug("Running ZBackup command: %s" % zbkp_cmd) |
| 164 | + self._zbackup = Popen(zbkp_cmd, stdin=PIPE, stderr=PIPE) |
| 165 | + self._tar = Popen(tar_cmd, stdout=self._zbackup.stdin, stderr=PIPE) |
| 166 | + self.wait() |
| 167 | + except Exception, e: |
| 168 | + raise OperationError("Could not execute ZBackup: %s" % e) |
| 169 | + logging.info("Completed running all ZBackups") |
| 170 | + self.completed = True |
| 171 | + finally: |
| 172 | + self.running = False |
| 173 | + self.stopped = True |
| 174 | + else: |
| 175 | + raise OperationError("Cannot find ZBackup at %s!" % self.zbackup_binary) |
0 commit comments