Source code for parfive.downloader

import os
import sys
import asyncio
import pathlib
import warnings
import contextlib
import urllib.parse
from functools import partial
from concurrent.futures import ThreadPoolExecutor

import aiohttp
from tqdm import tqdm, tqdm_notebook

import parfive
from .results import Results
from .utils import (
    FailedDownload,
    Token,
    _QueueList,
    default_name,
    get_filepath,
    get_ftp_size,
    get_http_size,
    in_notebook,
    run_in_thread,
)

try:
    import aioftp
except ImportError:  # pragma: nocover
    aioftp = None


__all__ = ['Downloader']


[docs]class Downloader: """ Download files in parallel. Parameters ---------- max_conn : `int`, optional The number of parallel download slots. progress : `bool`, optional If `True` show a main progress bar showing how many of the total files have been downloaded. If `False`, no progress bars will be shown at all. file_progress : `bool`, optional If `True` and ``progress`` is true, show ``max_conn`` progress bars detailing the progress of each individual file being downloaded. loop : `asyncio.AbstractEventLoop`, optional No longer used, and will be removed in a future release. notebook : `bool`, optional If `True` tqdm will be used in notebook mode. If `None` an attempt will be made to detect the notebook and guess which progress bar to use. overwrite : `bool` or `str`, optional Determine how to handle downloading if a file already exists with the same name. If `False` the file download will be skipped and the path returned to the existing file, if `True` the file will be downloaded and the existing file will be overwritten, if `'unique'` the filename will be modified to be unique. headers : `dict` Request headers to be passed to the server. Adds `User-Agent` information about `parfive`, `aiohttp` and `python` if not passed explicitely. """ def __init__(self, max_conn=5, progress=True, file_progress=True, loop=None, notebook=None, overwrite=False, headers=None): if loop: warnings.warn('The loop argument is no longer used, and will be ' 'removed in a future release.') self.max_conn = max_conn self._init_queues() # Configure progress bars if notebook is None: notebook = in_notebook() self.progress = progress self.file_progress = file_progress if self.progress else False self.tqdm = tqdm if not notebook else tqdm_notebook self.overwrite = overwrite self.headers = headers if headers is None or 'User-Agent' not in headers: self.headers = { 'User-Agent': f"parfive/{parfive.__version__} aiohttp/{aiohttp.__version__} python/{sys.version[:5]}"} def _init_queues(self): # Setup queues self.http_queue = _QueueList() self.ftp_queue = _QueueList() def _generate_tokens(self): # Create a Queue with max_conn tokens queue = asyncio.Queue(maxsize=self.max_conn) for i in range(self.max_conn): queue.put_nowait(Token(i + 1)) return queue @property def queued_downloads(self): """ The total number of files already queued for download. """ return len(self.http_queue) + len(self.ftp_queue)
[docs] def enqueue_file(self, url, path=None, filename=None, overwrite=None, **kwargs): """ Add a file to the download queue. Parameters ---------- url : `str` The URL to retrieve. path : `str`, optional The directory to retrieve the file into, if `None` defaults to the current directory. filename : `str` or `callable`, optional The filename to save the file as. Can also be a callable which takes two arguments the url and the response object from opening that URL, and returns the filename. (Note, for FTP downloads the response will be ``None``.) If `None` the HTTP headers will be read for the filename, or the last segment of the URL will be used. overwrite : `bool` or `str`, optional Determine how to handle downloading if a file already exists with the same name. If `False` the file download will be skipped and the path returned to the existing file, if `True` the file will be downloaded and the existing file will be overwritten, if `'unique'` the filename will be modified to be unique. If `None` the value set when constructing the `~parfive.Downloader` object will be used. kwargs : `dict` Extra keyword arguments are passed to `aiohttp.ClientSession.get` or `aioftp.Client.context` depending on the protocol. Notes ----- Proxy URL is read from the environment variables `HTTP_PROXY` or `HTTPS_PROXY`, depending on the protocol of the `url` passed. Proxy Authentication `proxy_auth` should be passed as a `aiohttp.BasicAuth` object. Proxy Headers `proxy_headers` should be passed as `dict` object. """ overwrite = overwrite or self.overwrite if path is None and filename is None: raise ValueError("Either path or filename must be specified.") elif path is None: path = './' path = pathlib.Path(path) if not filename: filepath = partial(default_name, path) elif callable(filename): filepath = filename else: # Define a function because get_file expects a callback def filepath(*args): return path / filename scheme = urllib.parse.urlparse(url).scheme if scheme in ('http', 'https'): get_file = partial(self._get_http, url=url, filepath_partial=filepath, overwrite=overwrite, **kwargs) self.http_queue.append(get_file) elif scheme == 'ftp': if aioftp is None: raise ValueError("The aioftp package must be installed to download over FTP.") get_file = partial(self._get_ftp, url=url, filepath_partial=filepath, overwrite=overwrite, **kwargs) self.ftp_queue.append(get_file) else: raise ValueError("URL must start with either 'http' or 'ftp'.")
@staticmethod def _run_in_loop(coro): """ Detect an existing, running loop and run in a separate loop if needed. If no loop is running, use asyncio.run to run the coroutine instead. """ try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop and loop.is_running(): aio_pool = ThreadPoolExecutor(1) new_loop = asyncio.new_event_loop() return run_in_thread(aio_pool, new_loop, coro) return asyncio.run(coro)
[docs] async def run_download(self, timeouts=None): """ Download all files in the queue. Parameters ---------- timeouts : `dict`, optional Overrides for the default timeouts for http downloads. Supported keys are any accepted by the `aiohttp.ClientTimeout` class. Defaults to 5 minutes for total session timeout and 90 seconds for socket read timeout. Returns ------- `parfive.Results` A list of files downloaded. Notes ----- The defaults for the `'total'` and `'sock_read'` timeouts can be overridden by two environment variables ``PARFIVE_TOTAL_TIMEOUT`` and ``PARFIVE_SOCK_READ_TIMEOUT``. """ timeouts = timeouts or {"total": os.environ.get("PARFIVE_TOTAL_TIMEOUT", 5 * 60), "sock_read": os.environ.get("PARFIVE_SOCK_READ_TIMEOUT", 90)} total_files = self.queued_downloads done = set() with self._get_main_pb(total_files) as main_pb: if len(self.http_queue): done.update(await self._run_http_download(main_pb, timeouts)) if len(self.ftp_queue): done.update(await self._run_ftp_download(main_pb, timeouts)) dl_results = await asyncio.gather(*done, return_exceptions=True) results = Results() # Iterate through the results and store any failed download errors in # the errors list of the results object. for res in dl_results: if isinstance(res, FailedDownload): results.add_error(res.filepath_partial, res.url, res.exception) elif isinstance(res, Exception): raise res else: results.append(res) return results
[docs] def download(self, timeouts=None): """ Download all files in the queue. Parameters ---------- timeouts : `dict`, optional Overrides for the default timeouts for http downloads. Supported keys are any accepted by the `aiohttp.ClientTimeout` class. Defaults to 5 minutes for total session timeout and 90 seconds for socket read timeout. Returns ------- `parfive.Results` A list of files downloaded. Notes ----- This is a synchronous version of `~parfive.Downloader.run_download`, an `asyncio` event loop will be created to run the download (in it's own thread if a loop is already running). The defaults for the `'total'` and `'sock_read'` timeouts can be overridden by two environment variables ``PARFIVE_TOTAL_TIMEOUT`` and ``PARFIVE_SOCK_READ_TIMEOUT``. """ return self._run_in_loop(self.run_download(timeouts))
[docs] def retry(self, results): """ Retry any failed downloads in a results object. .. note:: This will start a new event loop. Parameters ---------- results : `parfive.Results` A previous results object, the ``.errors`` property will be read and the downloads retried. Returns ------- `parfive.Results` A modified version of the input ``results`` with all the errors from this download attempt and any new files appended to the list of file paths. """ # Reset the queues self._init_queues() for err in results.errors: self.enqueue_file(err.url, filename=err.filepath_partial) new_res = self.download() results += new_res results._errors = new_res._errors return results
def _get_main_pb(self, total): """ Return the tqdm instance if we want it, else return a contextmanager that just returns None. """ if self.progress: return self.tqdm(total=total, unit='file', desc="Files Downloaded", position=0) else: return contextlib.contextmanager(lambda: iter([None]))() async def _run_http_download(self, main_pb, timeouts): async with aiohttp.ClientSession(headers=self.headers) as session: self._generate_tokens() futures = await self._run_from_queue( self.http_queue.generate_queue(), self._generate_tokens(), main_pb, session=session, timeouts=timeouts) # Wait for all the coroutines to finish done, _ = await asyncio.wait(futures) return done async def _run_ftp_download(self, main_pb, timeouts): futures = await self._run_from_queue( self.ftp_queue.generate_queue(), self._generate_tokens(), main_pb, timeouts=timeouts) # Wait for all the coroutines to finish done, _ = await asyncio.wait(futures) return done async def _run_from_queue(self, queue, tokens, main_pb, *, session=None, timeouts): futures = [] while not queue.empty(): get_file = await queue.get() token = await tokens.get() file_pb = self.tqdm if self.file_progress else False future = asyncio.ensure_future(get_file(session, token=token, file_pb=file_pb, timeouts=timeouts)) def callback(token, future, main_pb): tokens.put_nowait(token) # Update the main progressbar if main_pb and not future.exception(): main_pb.update(1) future.add_done_callback(partial(callback, token, main_pb=main_pb)) futures.append(future) return futures async def _get_http(self, session, *, url, filepath_partial, chunksize=100, file_pb=None, token, overwrite, timeouts, max_splits=5, **kwargs): """ Read the file from the given url into the filename given by ``filepath_partial``. Parameters ---------- session : `aiohttp.ClientSession` The `aiohttp.ClientSession` to use to retrieve the files. url : `str` The url to retrieve. filepath_partial : `callable` A function to call which returns the filepath to save the url to. Takes two arguments ``resp, url``. chunksize : `int` The number of bytes to read into the file at a time. file_pb : `tqdm.tqdm` or `False` Should progress bars be displayed for each file downloaded. token : `parfive.downloader.Token` A token for this download slot. max_splits: `int` Number of maximum concurrent connections per file. kwargs : `dict` Extra keyword arguments are passed to `aiohttp.ClientSession.get`. Returns ------- `str` The name of the file saved. """ timeout = aiohttp.ClientTimeout(**timeouts) try: scheme = urllib.parse.urlparse(url).scheme if 'HTTP_PROXY' in os.environ and scheme == 'http': kwargs['proxy'] = os.environ['HTTP_PROXY'] elif 'HTTPS_PROXY' in os.environ and scheme == 'https': kwargs['proxy'] = os.environ['HTTPS_PROXY'] async with session.get(url, timeout=timeout, **kwargs) as resp: if resp.status != 200: raise FailedDownload(filepath_partial, url, resp) else: filepath, skip = get_filepath(filepath_partial(resp, url), overwrite) if skip: return str(filepath) if callable(file_pb): file_pb = file_pb(position=token.n, unit='B', unit_scale=True, desc=filepath.name, leave=False, total=get_http_size(resp)) else: file_pb = None # This queue will contain the downloaded chunks and their offsets # as tuples: (offset, chunk) downloaded_chunk_queue = asyncio.Queue() download_workers = [] writer = asyncio.create_task( self._write_worker(downloaded_chunk_queue, file_pb, filepath)) if max_splits and resp.headers.get('Accept-Ranges', None) == "bytes": content_length = int(resp.headers['Content-length']) split_length = content_length // max_splits ranges = [ [start, start + split_length] for start in range(0, content_length, split_length) ] # let the last part download everything ranges[-1][1] = '' for _range in ranges: download_workers.append( asyncio.create_task(self._http_download_worker( session, url, chunksize, _range, timeout, downloaded_chunk_queue, **kwargs )) ) else: download_workers.append( asyncio.create_task(self._http_download_worker( session, url, chunksize, None, timeout, downloaded_chunk_queue, **kwargs )) ) # run all the download workers await asyncio.gather(*download_workers) # join() waits till all the items in the queue have been processed await downloaded_chunk_queue.join() writer.cancel() return str(filepath) except Exception as e: raise FailedDownload(filepath_partial, url, e) async def _write_worker(self, queue, file_pb, filepath): """ Worker for writing the downloaded chunk to the file. The downloaded chunk is put into a asyncio Queue by a download worker. This worker gets the chunk from the queue and write it to the file using the specified offset of the chunk. Parameters ---------- queue: `asyncio.Queue` Queue for chunks file_pb : `tqdm.tqdm` or `False` Should progress bars be displayed for each file downloaded. filepath: `pathlib.Path` Path to the which the file should be downloaded. """ with open(filepath, 'wb') as f: while True: offset, chunk = await queue.get() f.seek(offset) f.write(chunk) f.flush() # Update the progressbar for file if file_pb is not None: file_pb.update(len(chunk)) queue.task_done() async def _http_download_worker(self, session, url, chunksize, http_range, timeout, queue, **kwargs): """ Worker for downloading chunks from http urls. This function downloads the chunk from the specified http range and puts the chunk in the asyncio Queue. If no range is specified, then the whole file is downloaded via chunks and put in the queue. Parameters ---------- session : `aiohttp.ClientSession` The `aiohttp.ClientSession` to use to retrieve the files. url : `str` The url to retrieve. chunksize : `int` The number of bytes to read into the file at a time. http_range: (`int`, `int`) or `None` Start and end bytes of the file. In None, then no `Range` header is specified in request and the whole file will be downloaded. queue: `asyncio.Queue` Queue to put the download chunks. kwargs : `dict` Extra keyword arguments are passed to `aiohttp.ClientSession.get`. """ headers = kwargs.pop('headers', {}) if http_range: headers['Range'] = 'bytes={}-{}'.format(*http_range) # init offset to start of range offset, _ = http_range else: offset = 0 async with session.get(url, timeout=timeout, headers=headers, **kwargs) as resp: while True: chunk = await resp.content.read(chunksize) if not chunk: break await queue.put((offset, chunk)) offset += len(chunk) async def _get_ftp(self, session=None, *, url, filepath_partial, file_pb=None, token, overwrite, timeouts, **kwargs): """ Read the file from the given url into the filename given by ``filepath_partial``. Parameters ---------- session : `None` A placeholder for API compatibility with ``_get_http`` url : `str` The url to retrieve. filepath_partial : `callable` A function to call which returns the filepath to save the url to. Takes two arguments ``resp, url``. file_pb : `tqdm.tqdm` or `False` Should progress bars be displayed for each file downloaded. token : `parfive.downloader.Token` A token for this download slot. kwargs : `dict` Extra keyword arguments are passed to `~aioftp.Client.context`. Returns ------- `str` The name of the file saved. """ parse = urllib.parse.urlparse(url) try: async with aioftp.Client.context(parse.hostname, **kwargs) as client: if parse.username and parse.password: await client.login(parse.username, parse.password) # This has to be done before we start streaming the file: total_size = await get_ftp_size(client, parse.path) async with client.download_stream(parse.path) as stream: filepath, skip = get_filepath(filepath_partial(None, url), overwrite) if skip: return str(filepath) if callable(file_pb): file_pb = file_pb(position=token.n, unit='B', unit_scale=True, desc=filepath.name, leave=False, total=total_size) else: file_pb = None downloaded_chunks_queue = asyncio.Queue() download_workers = [] writer = asyncio.create_task( self._write_worker(downloaded_chunks_queue, file_pb, filepath)) download_workers.append( asyncio.create_task(self._ftp_download_worker( stream, downloaded_chunks_queue)) ) await asyncio.gather(*download_workers) await downloaded_chunks_queue.join() writer.cancel() return str(filepath) except Exception as e: raise FailedDownload(filepath_partial, url, e) async def _ftp_download_worker(self, stream, queue): """ Similar to `Downloader._http_download_worker`. See that function's documentation for more info. Parameters ---------- stream: `aioftp.StreamIO` Stream of the file to be downloaded. queue: `asyncio.Queue` Queue to put the download chunks. """ offset = 0 async for chunk in stream.iter_by_block(): # Write this chunk to the output file. await queue.put((offset, chunk)) offset += len(chunk)