Skip to content

下载图片 in async (aiohttp)

pyproject.toml

toml
[tool.poetry.dependencies]
python = "^3.9"
aiohttp = "^3.9.1"
aiofiles = "^23.2.1"

[[tool.poetry.source]]
name = "aliyun"
url = "http://mirrors.aliyun.com/pypi/simple"
default = true

run.py

python
import asyncio
import aiofiles
import aiohttp
import os
import time
import traceback


class Timer:

    def __enter__(self):
        self.start_time = self.get_time()
        return self

    def get_time(self):
        return time.monotonic()
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.elapsed_time = self.get_time() - self.start_time
        print(self)
        
    def __str__(self):
        return f"[+] Elapsed time: {self.elapsed_time:.3f} seconds"

class ImageDownloader:

    def __init__(self):
        pass

    async def write_file(self, filename, content):
        async with aiofiles.open(filename, mode='wb') as f:
            await f.write(content)

    async def download_image(self, session, url, filename, index, sema):
        async with sema:
            try:
                async with session.get(url) as response:
                    content = await response.read()
                    await self.write_file(filename, content)
                    return index
            except:
                traceback.print_exc()
                return None

    async def worker(self, task, session, sema):
        url = task.get('url')
        index = task.get('index')
        filename = task.get('filename')

        return await self.download_image(session, url, filename, index, sema)

    async def run(self, producer):
        sema = asyncio.Semaphore(10)
        async with aiohttp.ClientSession() as session:
            tasks = []
            async for task in producer():
                tasks.append(self.worker(task, session, sema))
            await asyncio.gather(*tasks)

    def start(self, producer):
        with Timer() as timer:
            asyncio.run(self.run(producer))

class Producer:

    def __init__(self):
        self.dir_d = self.exists_dir('download/')

    @staticmethod
    def exists_dir(dir_d):
        if not os.path.exists(dir_d):
            os.mkdir(dir_d)
            print(f'[+] Created directory {dir_d}')
        return dir_d

    async def producer(self):
        url = 'http://wx2.sinaimg.cn/mw600/0089jzyPly1gqo2l5gwb7j31400u04qp.jpg'
        for index in range(1000):
            filename = os.path.join(self.dir_d, f'{index}.jpg')
            yield dict(index=index, url=url, filename=filename)

def main():
    downloader = ImageDownloader()
    p = Producer()
    downloader.start(p.producer)

if __name__ == '__main__':
    main()

Released under the MIT License.