Source code for pdiffcopy.client

# Fast large file synchronization inspired by rsync.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: March 6, 2020
# URL: https://pdiffcopy.readthedocs.io

"""Parallel, differential file copy client."""

# Standard library modules.
import functools
import os
import pipes
import subprocess

# External dependencies.
import requests
from humanfriendly import Timer, format_size
from humanfriendly.prompts import prompt_for_confirmation
from humanfriendly.tables import format_pretty_table
from humanfriendly.terminal import output
from humanfriendly.terminal.spinners import Spinner
from humanfriendly.text import compact, format, pluralize
from property_manager import PropertyManager, cached_property, mutable_property, set_property
from six.moves.urllib.parse import urlencode, urlparse, urlunparse
from verboselogs import VerboseLogger

# Modules included in our package.
from pdiffcopy import BLOCK_SIZE, DEFAULT_CONCURRENCY, DEFAULT_PORT
from pdiffcopy.exceptions import BenchmarkAbortedError
from pdiffcopy.hashing import compute_hashes
from pdiffcopy.mp import Promise, WorkerPool
from pdiffcopy.operations import get_file_info, read_block, resize_file, write_block

# Public identifiers that require documentation.
__all__ = ("Client", "get_hashes_fn", "Location", "logger", "transfer_block_fn")

# Initialize a logger for this module.
logger = VerboseLogger(__name__)


[docs]class Client(PropertyManager): """Python API for the client side of the ``pdiffcopy`` program."""
[docs] @mutable_property def benchmark(self): """How many times the benchmark should be run (an integer, defaults to 0).""" return 0
[docs] @mutable_property def block_size(self): """The block size used by the client.""" return BLOCK_SIZE
[docs] @mutable_property def concurrency(self): """The number of parallel processes that the client is allowed to start.""" return DEFAULT_CONCURRENCY
[docs] @mutable_property def delta_transfer(self): """Whether delta transfer is enabled (a boolean, defaults to :data:`True`).""" return True
[docs] @mutable_property def dry_run(self): """Whether the client is allowed to make changes.""" return False
[docs] @mutable_property def hash_method(self): """The block hash method (a string, defaults to 'sha1').""" return "sha1"
@mutable_property def source(self): """The :class:`Location` from which data is read."""
[docs] @source.setter def source(self, value): """Automatically coerce :attr:`source` to a :class:`Location`.""" set_property(self, "source", Location(expression=value))
@mutable_property def target(self): """The :class:`Location` to which data is written."""
[docs] @target.setter def target(self, value): """Automatically coerce :attr:`target` to a :class:`Location`.""" set_property(self, "target", Location(expression=value))
[docs] def compute_transfer_size(self, offsets): """ Figure out how much data we're going to transfer. :param offsets: a list of integers with the offsets of the blocks to be synchronized. :returns: The amount of data to be transferred in bytes (an integer). This would be trivially easy if it wasn't for the last block which can be smaller than the block size. Depending on the configured block size and the size of the file being synchronized the difference may be negligible or quite significant, so we go to the effort of calculating this correctly. """ transfer_size = self.block_size * len(offsets) last_block_size = (self.source.file_size % self.block_size) or self.block_size last_block_offset = self.source.file_size - last_block_size if last_block_offset in offsets: transfer_size -= self.block_size transfer_size += last_block_size return transfer_size
[docs] def mutate_target(self, percentage): """Invalidate a percentage of the data in the :attr:`target` file.""" if self.target.hostname: raise TypeError("Benchmark requires local target file!") num_bytes = self.target.file_size / 100 * percentage if self.target.file_size > 1024 * 1024 * 1024: block_size = 1024 * 1024 else: block_size = 1024 * 256 logger.notice("Mutating %i%% of the target file ..", percentage) with open(self.target.filename, "r+b") as handle: # Allocate the zeroed block only once. block = b"\0" * block_size # Write the zeroed block many times. for i in range(int(num_bytes / block_size)): handle.write(block)
[docs] def run_benchmark(self): """Benchmark the effectiveness of the delta transfer implementation.""" # Make sure the operator realizes what we're going to do, before it happens. if os.environ.get("PDIFFCOPY_BENCHMARK") != "allowed": logger.notice("Set $PDIFFCOPY_BENCHMARK=allowed to bypass the following interactive prompt.") question = """ This will mutate the target file and then restore its original contents. Are you sure this is okay? """ if not prompt_for_confirmation(compact(question), default=False): raise BenchmarkAbortedError("Permission to run benchmark denied.") samples = [] logger.info("Performing initial synchronization to level the playing ground ..") self.synchronize_once() # If the target file didn't exist before we created it then # self.target.exists may have cached the value False. self.clear_cached_properties() # Get the rsync configuration from environment variables. rsync_server = os.environ.get("PDIFFCOPY_BENCHMARK_RSYNC_SERVER") rsync_module = os.environ.get("PDIFFCOPY_BENCHMARK_RSYNC_MODULE") rsync_root = os.environ.get("PDIFFCOPY_BENCHMARK_RSYNC_ROOT") have_rsync = rsync_server and rsync_module and rsync_root # Run the benchmark for the requested number of iterations. for i in range(1, self.benchmark + 1): # Initialize timers to compare pdiffcopy and rsync runtime. pdiffcopy_timer = Timer(resumable=True) rsync_timer = Timer(resumable=True) # Mutate the target file. difference = 100 / self.benchmark * i self.mutate_target(difference) # Synchronize using pdiffcopy. with Timer(resumable=True) as pdiffcopy_timer: num_blocks = self.synchronize_once() # Synchronize using rsync? if have_rsync: self.mutate_target(difference) with rsync_timer: rsync_command_line = [ "rsync", "--inplace", format( "rsync://%s/%s", rsync_server, os.path.join(rsync_module, os.path.relpath(self.source.filename, rsync_root)), ), self.target.filename, ] logger.info("Synchronizing changes using %s ..", " ".join(map(pipes.quote, rsync_command_line))) subprocess.check_call(rsync_command_line) logger.info("Synchronized changes using rsync in %s ..", rsync_timer) # Summarize the results of this iteration. metrics = ["%i%%" % difference] metrics.append(format_size(num_blocks * self.block_size, binary=True)) metrics.append(str(pdiffcopy_timer)) if have_rsync: metrics.append(str(rsync_timer)) samples.append(metrics) # Render an overview of the results in the form of a table. column_names = ["Delta size", "Data transferred", "Runtime of pdiffcopy"] if have_rsync: column_names.append("Runtime of rsync") output(format_pretty_table(samples, column_names=column_names))
[docs] def synchronize(self): """Synchronize from :attr:`source` to :attr:`target` (possibly more than once, see :attr:`benchmark`).""" if self.benchmark > 0: self.run_benchmark() else: self.synchronize_once()
[docs] def synchronize_once(self): """ Synchronize from :attr:`source` to :attr:`target`. :returns: The number of blocks that differed (an integer). """ timer = Timer() if self.delta_transfer and not self.target.exists: logger.info("Disabling delta transfer because target file doesn't exist ..") self.delta_transfer = False if self.delta_transfer: logger.info("Computing similarity index for delta transfer ..") offsets = self.find_changes() else: logger.info("Performing whole file copy (skipping delta transfer) ..") offsets = range(0, self.source.file_size, self.block_size) if offsets: self.transfer_changes(offsets) logger.info("Synchronized changes in %s ..", timer) else: logger.info("Nothing to do! (file contents match)") return len(offsets)
[docs] def find_changes(self): """Helper for :func:`synchronize()` to compute the similarity index.""" timer = Timer() logger.info("Computing hashes using %s ..", pluralize(self.concurrency, "worker")) hash_opts = dict(block_size=self.block_size, concurrency=self.concurrency, method=self.hash_method) source_promise = Promise(target=get_hashes_fn, args=[self.source], kwargs=hash_opts) target_promise = Promise(target=get_hashes_fn, args=[self.target], kwargs=hash_opts) source_hashes = source_promise.join() target_hashes = target_promise.join() num_hits = 0 num_misses = 0 todo = [] for offset in sorted(set(source_hashes) | set(target_hashes)): if source_hashes.get(offset) == target_hashes.get(offset): num_hits += 1 else: num_misses += 1 todo.append(offset) logger.info("Computed %i%% similarity in %s.", num_hits / ((num_hits + num_misses) / 100.0), timer) return todo
[docs] def transfer_changes(self, offsets): """ Helper for :func:`synchronize()` to transfer the differences. :param offsets: A list of integers with the byte offsets of the blocks to copy from :attr:`source` to :attr:`target`. """ timer = Timer() transfer_size = self.compute_transfer_size(offsets) formatted_size = format_size(transfer_size, binary=True) action = "download" if self.source.hostname else "upload" logger.info("Will %s %s totaling %s.", action, pluralize(len(offsets), "block"), formatted_size) if self.dry_run: return # Make sure the target file has the right size. if not (self.target.exists and self.source.file_size == self.target.file_size): self.target.resize(self.source.file_size) # Transfer changed blocks in parallel. pool = WorkerPool( concurrency=self.concurrency, generator_fn=functools.partial(iter, offsets), worker_fn=functools.partial( transfer_block_fn, block_size=self.block_size, source=self.source, target=self.target ), ) spinner = Spinner(label="%sing changed blocks" % action.capitalize(), total=len(offsets)) with pool, spinner: for i, result in enumerate(pool, start=1): spinner.step(progress=i) logger.info( "%sed %i blocks (%s) in %s (%s/s).", action.capitalize(), len(offsets), formatted_size, timer, format_size(transfer_size / timer.elapsed_time, binary=True), )
[docs]def get_hashes_fn(location, **options): """Adapter for :mod:`multiprocessing` used by :func:`Client.find_changes()`.""" return location.get_hashes(**options)
[docs]def transfer_block_fn(offset, source, target, block_size): """Adapter for :mod:`multiprocessing` used by :func:`Client.transfer_changes()`.""" target.write_block(offset, source.read_block(offset, block_size))
[docs]class Location(PropertyManager): """A local or remote file to be copied."""
[docs] @cached_property def exists(self): """:data:`True` if the file exists, :data:`False` otherwise.""" logger.info("Checking if %s exists ..", self.label) return bool(self.file_info)
@mutable_property def expression(self): """The location expression (a string).""" if self.hostname: netloc = "%s:%s" % (self.hostname, self.port_number) return urlunparse(("http", netloc, self.filename, "", "", "")) else: return self.filename
[docs] @expression.setter def expression(self, value): """Parse a location expression.""" parsed_url = urlparse(value) if parsed_url.scheme and parsed_url.scheme != "http": msg = "Invalid URL scheme! (expected 'http', got %r instead)" raise ValueError(msg % parsed_url.scheme) if parsed_url.hostname: self.filename = parsed_url.path self.hostname = parsed_url.hostname self.port_number = parsed_url.port or DEFAULT_PORT else: self.filename = value self.hostname = None self.port_number = None assert self.filename is not None
[docs] @mutable_property def filename(self): """The absolute pathname of the file to copy (a string)."""
[docs] @mutable_property def hostname(self): """The host name of a pdiffcopy server (a string or :data:`None`)."""
@property def label(self): """A human friendly label for the location (a string).""" vicinity = "remote" if self.hostname else "local" return "%s file %s" % (vicinity, self.filename)
[docs] @mutable_property def port_number(self): """The port number of a pdiffcopy server (a number or :data:`None`)."""
[docs] @cached_property def file_info(self): """A dictionary with file metadata.""" if self.hostname: request_url = self.get_url("info", filename=self.filename) logger.debug("Requesting %s ..", request_url) response = requests.get(request_url) if response.status_code == 404: return {} else: response.raise_for_status() return response.json() else: return get_file_info(self.filename)
[docs] @cached_property def file_size(self): """The size of the file in bytes (an integer).""" logger.info("Getting size of %s ..", self.label) return self.file_info.get("size")
[docs] def get_hashes(self, **options): """ Get the hashes of the blocks in a file. :param options: See :attr:`get_url()`. :returns: A generator of tokens with two values each: 1. A byte offset into the file (an integer). 2. The hash of the block starting at that offset (a string). """ results = {} options.update(filename=self.filename) if self.hostname: logger.info("Requesting hashes from server ..") request_url = self.get_url("hashes", **options) logger.debug("Requesting %s ..", request_url) response = requests.get(request_url, stream=True) response.raise_for_status() for line in response.iter_lines(decode_unicode=True): offset, _, digest = line.partition("\t") results[int(offset)] = digest else: progress = 0 block_size = options["block_size"] total = os.path.getsize(options["filename"]) with Spinner(label="Computing hashes", total=total) as spinner: for offset, digest in compute_hashes(**options): results[offset] = digest progress += block_size spinner.step(progress) return results
[docs] def get_url(self, endpoint, **params): """ Get the server URL for the given `endpoint`. :param endpoint: The name of a server side endpoint (a string). :param params: Any query string parameters. """ return format( "http://{hostname}:{port}/{endpoint}?{params}", hostname=self.hostname, port=self.port_number, endpoint=endpoint, params=urlencode(params), )
[docs] def read_block(self, offset, size): """ Read a block of data from :attr:`filename`. :param offset: The byte offset where reading starts (an integer). :param size: The number of bytes to read (an integer). :returns: A byte string. """ if self.hostname: request_url = self.get_url("blocks", filename=self.filename, offset=offset, size=size) logger.debug("Requesting %s ..", request_url) response = requests.get(request_url) response.raise_for_status() return response.content else: return read_block(self.filename, offset, size)
[docs] def resize(self, size): """ Adjust the size of :attr:`filename` to the given size. :param size: The new file size in bytes (an integer). """ if self.hostname: request_url = self.get_url("resize", filename=self.filename, size=size) logger.debug("Posting to %s ..", request_url) requests.post(request_url).raise_for_status() else: resize_file(self.filename, size)
[docs] def write_block(self, offset, data): """ Write a block of data to :attr:`filename`. :param offset: The byte offset where writing starts (an integer). :param data: The byte string to write to the file. """ if self.hostname: request_url = self.get_url("blocks", filename=self.filename, offset=offset) logger.debug("Posting to %s ..", request_url) response = requests.post(request_url, data=data) response.raise_for_status() else: write_block(self.filename, offset, data)