Skip to main content

ETL 工具

项目描述

中文介绍

介绍

spparser 的目标是提供一种简洁高效的方式来读取、写入和处理文本数据。同时支持同步和异步读写文件,支持正则、xpath、css选择器提取数据。未来将实现对数据库的读写支持,并引入NLP,提供更灵活的处理方式。架构图如下:
家沟

AsyncReader 和 AsyncWriter 的灵感来自 @zpoint 的idataapi_transform

安装

pip3 install spparser

快速开始

from spparser import Reader, Writer, Extractor

def main():
    data = Reader.read_csv(file_path="./example.csv", each_line_type="dict", max_read_lines=10)
    '''
    example.csv:
    field1,field2
    1,2
    3,4
    5,6
    '''
    '''
    read_csv result: data = [{'a': '122github', 'b': '2'}, {'a': '-8spparser999', 'b': '4'}]
    '''
    alist = []
    for item in data:
        res = Extractor.regex(r"[a-zA-Z]+", item["a"], flags=0, trim_mode=True, return_all=False)
        alist.append(res)
    '''
    alist = ["github","spparser"]
    '''
    Writer.write(alist, "result.json")

if __name__ == "__main__":
    main()

使用 Extractor.xpath() 提取 html 文本

from spparser import Reader, Writer, Extractor

def main():
    '''
    demo.html
    <html lang="en">
    <head>
        <title>spparser</title>
    </head>
    <body>
        <ul id="container">
            <li class="object-1" tag="1"/>
            <li class="object-2"/>
            <li class="object-3"/>
        </ul>
    </body>
    </html>
    '''
    '''
    read_csv result: data = [{'a': '122github', 'b': '2'}, {'a': '-8spparser999', 'b': '4'}]
    '''
    html_text = Reader.read_anyfile("demo.html",line_by_line=False)
    res = Extractor.xpath("//title/text()",html_text)
    print(res)

if __name__ == "__main__":
    main()

异步读取文件

from spparser import Reader,Writer, AsyncReader, AsyncWriter
import asyncio

async def main():
    reader = AsyncReader.async_csv_reader("./src.csv",batch_size=10,each_line_type="dict",max_read_lines=100, debug=True)
    with AsyncWriter.async_csv_writer("./dest.csv") as writer:
        async for items in reader:
            #for item in items:
                # Parser process
            await writer.write(items)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

当 debug 设置为 True 时,输出日志:

[2020-07-17  14:54:04] AsyncReader.py[line:70] INFO: from source: ./src.csv, this batch get 10 lines
[2020-07-17  14:54:04] AsyncWriter.py[line:63] INFO: to destination: ./dest.csv, write 10 lines.
[2020-07-17  14:54:04] AsyncReader.py[line:70] INFO: from source: ./src.csv, this batch get 10 lines
[2020-07-17  14:54:04] AsyncWriter.py[line:63] INFO: to destination: ./dest.csv, write 10 lines.
[2020-07-17  14:54:04] AsyncReader.py[line:70] INFO: from source: ./src.csv, this batch get 10 lines
[2020-07-17  14:54:04] AsyncWriter.py[line:63] INFO: to destination: ./dest.csv, write 10 lines.
[2020-07-17  14:54:04] AsyncReader.py[line:70] INFO: from source: ./src.csv, this batch get 10 lines
[2020-07-17  14:54:04] AsyncWriter.py[line:63] INFO: to destination: ./dest.csv, write 10 lines.
...

对于mongodb异步读写:

async def main():
    reader = AsyncReader.async_mongo_reader(query={},collection="src_col", host="my_address",port=27017, database="my_db",username="my_name", password="my_pwd", batch_size=100,max_read_lines=1000)
    with AsyncWriter.async_mongo_writer(collection="dest_col", host="my_address",port=27017, database="my_db",username="my_name", password="my_pwd") as writer:
        async for items in getter:
            await writer.write(items)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

0.4.10 版本增加了对 MySQL 异步读写的支持

async def main():
    sql = "CREATE TABLE IF NOT EXISTS TARGET_TABLE (field1 type1, field2 type2) DEFAULT CHARSET=utf8;"
    getter = AsyncReader.async_mysql_reader(query_sql="SELECT * FROM SRC_TABLE",host="localhost", port=None, database="test", username="username", password="password",batch_size=100,max_read_lines=1000)
    with AsyncWriter.async_mysql_writer(create_table_sql=sql,host="localhost", port=None, database="test", username="username", password="password") as writer:
        async for items in getter:
            await writer.write(items)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

历史

0.2.10

  • async_anyfile_reader、async_anyfile_writer、async_csv_reader、async_csv_writer 支持。
  • 提取器支持中的 xpath、css、正则表达式选择器。

0.3.30

  • async_mongo_reader, async_mongo_writer 支持

0.4.10

  • async_mysql_reader, async_mysql_writer 支持

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

spparser-0.5.10.tar.gz (11.4 kB 查看哈希

已上传 source

内置分布

spparser-0.5.10-py3-none-any.whl (13.7 kB 查看哈希

已上传 py3