Skip to main content

Singer.io tap 用于从 MySQL 和 MariaDB 中提取数据 - PipelineWise 兼容

项目描述

pipelinewise-tap-mysql

PyPI 版本 PyPI - Python 版本 执照:麻省理工学院

Singer tap 从MySQL数据库中提取数据并按照Singer 规范生成 JSON 格式的数据。

这是一个PipelineWise兼容的水龙头连接器。

如何使用它

运行此水龙头的推荐方法是从PipelineWise使用它。从 PipelineWise 运行它时,您无需使用 JSON 文件配置此分流器,并且大部分操作都是自动化的。请查看Tap MySQL的相关文档

如果您想独立运行此Singer Tap ,请进一步阅读。

用法

tap-mysql本节通过从表中提取数据来深入了解 的基本用法。它假定您可以连接到 MySQL 数据库并从中读取。

安装

首先,确保您的系统上安装了 Python 3,或者按照MacUbuntu的这些安装说明进行操作。

推荐使用 virtualenv:

  python3 -m venv venv
  pip install pipelinewise-tap-mysql

或者

  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .

有一个源数据库

这个 MySQL 数据库中有一些重要的业务数据孤岛——我们需要提取它。这是我们要同步的表:

mysql> select * from example_db.animals;
+----|----------|----------------------+
| id | name     | likes_getting_petted |
+----|----------|----------------------+
|  1 | aardvark |                    0 |
|  2 | bear     |                    0 |
|  3 | cow      |                    1 |
+----|----------|----------------------+
3 rows in set (0.00 sec)

创建配置文件

创建一个包含数据库连接凭据的配置文件,请参见示例.

配置参数列表:

范围 类型 必需的 默认 描述
主持人 细绳 是的 - mysql/mariadb 主机
港口 整数 是的 - mysql/mariadb 端口
用户 细绳 是的 - 数据库用户名
密码 细绳 是的 - 数据库密码
游标类 细绳 pymysql.cursors.SSCursor 设置 PyMYSQL 使用的游标类
数据库 细绳 - 要使用的数据库,None 表示不使用特定的数据库。由 PyMYSQL 使用
server_id 整数 错误的 随机生成的 int 当此分接头连接到服务器时,用作从属 ID
过滤器数据库 细绳 错误的 - 逗号分隔的模式列表,仅从特定模式中提取表并提高数据提取性能
使用_gtid 布尔 错误的 错误的
使用 GTID 启用基于日志的复制的标志
引擎 字符串(“mysql”或“mariadb”) 错误的 'mysql' 指示服务器的风格,用于带有 GTID 的 LOG_BASED
ssl 字符串(“真”) 错误的 启用 SSL 连接
ssl_ca 细绳 - 对于自签名 SSL
ssl_cert 细绳 - 对于自签名 SSL
ssl_key 细绳 - 对于自签名 SSL
内部主机名 细绳 - 覆盖谷歌云的匹配主机名
session_sqls 字符串列表 ['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600'] 动态设置会话变量。

发现模式

可以在发现模式下调用点击以查找数据库中可用的表和列:

$ tap-mysql --config config.json --discover

输出一个发现的目录,其中包含每个表的 JSON 模式描述。一个源表直接对应一个 Singer 流。

{
  "streams": [
    {
      "tap_stream_id": "example_db-animals",
      "table_name": "animals",
      "schema": {
        "type": "object",
        "properties": {
          "name": {
            "inclusion": "available",
            "type": [
              "null",
              "string"
            ],
            "maxLength": 255
          },
          "id": {
            "inclusion": "automatic",
            "minimum": -2147483648,
            "maximum": 2147483647,
            "type": [
              "null",
              "integer"
            ]
          },
          "likes_getting_petted": {
            "inclusion": "available",
            "type": [
              "null",
              "boolean"
            ]
          }
        }
      },
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "row-count": 3,
            "table-key-properties": [
              "id"
            ],
            "database-name": "example_db",
            "selected-by-default": false,
            "is-view": false,
          }
        },
        {
          "breadcrumb": [
            "properties",
            "id"
          ],
          "metadata": {
            "sql-datatype": "int(11)",
            "selected-by-default": true
          }
        },
        {
          "breadcrumb": [
            "properties",
            "name"
          ],
          "metadata": {
            "sql-datatype": "varchar(255)",
            "selected-by-default": true
          }
        },
        {
          "breadcrumb": [
            "properties",
            "likes_getting_petted"
          ],
          "metadata": {
            "sql-datatype": "tinyint(1)",
            "selected-by-default": true
          }
        }
      ],
      "stream": "animals"
    }
  ]
}

字段选择

在同步模式下,使用目录并查找已在其关联元数据条目中tap-mysql标记为选中的表和字段。

将 tap 的发现模式的输出重定向到一个文件,以便对其进行修改:

$ tap-mysql -c config.json --discover > properties.json

然后编辑properties.json以进行选择。在此示例中,我们需要 animals表格。流的元数据条目(与 关联"breadcrumb": [])获得顶级selected标志,其列的元数据条目也是如此。此外,我们将animals使用FULL_TABLE策略标记要复制的表。有关更多信息,请参阅复制方法和状态文件

[
  {
    "breadcrumb": [],
    "metadata": {
      "row-count": 3,
      "table-key-properties": [
        "id"
      ],
      "database-name": "example_db",
      "selected-by-default": false,
      "is-view": false,
      "selected": true,
      "replication-method": "FULL_TABLE"
    }
  },
  {
    "breadcrumb": [
      "properties",
      "id"
    ],
    "metadata": {
      "sql-datatype": "int(11)",
      "selected-by-default": true,
      "selected": true
    }
  },
  {
    "breadcrumb": [
      "properties",
      "name"
    ],
    "metadata": {
      "sql-datatype": "varchar(255)",
      "selected-by-default": true,
      "selected": true
    }
  },
  {
    "breadcrumb": [
      "properties",
      "likes_getting_petted"
    ],
    "metadata": {
      "sql-datatype": "tinyint(1)",
      "selected-by-default": true,
      "selected": true
    }
  }
]

同步模式

使用描述字段和表选择的属性目录,可以在同步模式下调用点击:

$ tap-mysql -c config.json --properties properties.json

消息按照 Singer 规范写入标准输出。生成的 JSON 数据流可以由 Singer 目标使用。

{"value": {"currently_syncing": "example_db-animals"}, "type": "STATE"}

{"key_properties": ["id"], "stream": "animals", "schema": {"properties": {"name": {"inclusion": "available", "maxLength": 255, "type": ["null", "string"]}, "likes_getting_petted": {"inclusion": "available", "type": ["null", "boolean"]}, "id": {"inclusion": "automatic", "minimum": -2147483648, "type": ["null", "integer"], "maximum": 2147483647}}, "type": "object"}, "type": "SCHEMA"}

{"stream": "animals", "version": 1509133344771, "type": "ACTIVATE_VERSION"}

{"record": {"name": "aardvark", "likes_getting_petted": false, "id": 1}, "stream": "animals", "version": 1509133344771, "type": "RECORD"}

{"record": {"name": "bear", "likes_getting_petted": false, "id": 2}, "stream": "animals", "version": 1509133344771, "type": "RECORD"}

{"record": {"name": "cow", "likes_getting_petted": true, "id": 3}, "stream": "animals", "version": 1509133344771, "type": "RECORD"}

{"stream": "animals", "version": 1509133344771, "type": "ACTIVATE_VERSION"}

{"value": {"currently_syncing": "example_db-animals", "bookmarks": {"example_db-animals": {"initial_full_table_complete": true}}}, "type": "STATE"}

{"value": {"currently_syncing": null, "bookmarks": {"example_db-animals": {"initial_full_table_complete": true}}}, "type": "STATE"}

复制方法和状态文件

在上面的示例中,我们调用tap-mysql时没有提供状态文件,也没有指定复制方法。复制给定表的方法FULL_TABLELOG_BASEDINCREMENTAL

LOG_BASED

LOG_BASED 复制利用服务器的二进制日志(binlogs),这种方法可以与主服务器一起使用,tap 充当副本并请求主服务器流式传输日志事件,然后tap 消耗与行更改有关的事件(插入、更新、删除)、binlog 文件轮换和 gtid 事件。

Log_based 方法总是需要一个初始同步来获取表的快照和当前的 binlog 坐标/gtid 位置。

tap 支持两种消费日志事件的方式:使用 binlog 坐标或 GTID,默认行为是使用 binlog 坐标,在打开use_gtid标志时,您必须指定引擎风格(mariadb/mysql),因为 GTID 实现有很大不同这两个引擎。

开启use_gtidflag且引擎为MariaDB时,tap会根据状态中已有的binlog坐标动态推断GTID pos,如果引擎为mysql,则会失败。

使用 binlog 坐标时的状态

{
  "bookmarks": {
    "example_db-table1": {"log_file": "mysql-binlog.0003", "log_pos": 3244},
    "example_db-table2": {"log_file": "mysql-binlog.0001", "log_pos": 42},
    "example_db-table3": {"log_file": "mysql-binlog.0003", "log_pos": 100}
  }
}

使用 GTID 时的状态

{
  "bookmarks": {
    "example_db-table1": {"log_file": "mysql-binlog.0003", "log_pos": 3244, "gtid": "0:364864374:599"},
    "example_db-table2": {"log_file": "mysql-binlog.0001", "log_pos": 42, "gtid": "0:364864374:375"},
    "example_db-table3": {"log_file": "mysql-binlog.0003", "log_pos": 100, "gtid": "0:364864374:399"}
  }
}

全表

每次调用 tap 时,全表复制都会从源表中提取所有数据。

增加的

增量复制与状态文件结合使用,仅在每次调用点击时提取新记录。这也需要在表的元数据中指定复制键。

例子

让我们再次同步animals表,但这次使用增量复制。复制方法和复制键在属性文件中表的元数据条目中设置:

{
  "streams": [
    {
      "tap_stream_id": "example_db-animals",
      "table_name": "animals",
      "schema": { ... },
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "row-count": 3,
            "table-key-properties": [
              "id"
            ],
            "database-name": "example_db",
            "selected-by-default": false,
            "is-view": false,
            "replication-method": "INCREMENTAL",
            "replication-key": "id"
          }
        },
        ...
      ],
      "stream": "animals"
    }
  ]
}

到目前为止,我们还没有有意义的状态,所以只需在没有状态文件的情况下再次以同步模式调用 tap:

$ tap-mysql -c config.json --properties properties.json

FULL_TABLE输出消息看起来与使用默认复制方法复制表时非常相似。一个重要的区别是 STATE消息现在包含一个replication_key_value书签或高水位标记,用于提取的数据:

{ “类型” “状态” “值” { “当前同步” “example_db-animals” }}    

{ “流” “动物” “类型” “SCHEMA” “模式” { “类型” “对象” “属性” { “id” { “类型” [ “空” “整数” ],“最小值” -2147483648 “最大值” 2147483647 “包含” “自动” },“名称” { “类型” [ “空” “字符串”                     ],“包含” “可用” “maxLength” 255 },“likes_getting_petted” { “类型” [ “null” “布尔” ],“包含” “可用” }}},“key_properties” [ “身份证” ]}            

{ “流” “动物” “类型” “ACTIVATE_VERSION” “版本” 1509135204169 }     

{ “流” “动物” “类型” “记录” “版本” 1509135204169 “记录” { “id” 1 “名称” “土豚” “likes_getting_petted” }}            

{ “流” “动物” “类型” “记录” “版本” 1509135204169