Singer.io tap 用于从 MySQL 和 MariaDB 中提取数据 - PipelineWise 兼容
项目描述
pipelinewise-tap-mysql
Singer tap 从MySQL数据库中提取数据并按照Singer 规范生成 JSON 格式的数据。
这是一个PipelineWise兼容的水龙头连接器。
如何使用它
运行此水龙头的推荐方法是从PipelineWise使用它。从 PipelineWise 运行它时,您无需使用 JSON 文件配置此分流器,并且大部分操作都是自动化的。请查看Tap MySQL的相关文档
如果您想独立运行此Singer Tap ,请进一步阅读。
用法
tap-mysql本节通过从表中提取数据来深入了解 的基本用法。它假定您可以连接到 MySQL 数据库并从中读取。
安装
首先,确保您的系统上安装了 Python 3,或者按照Mac或 Ubuntu的这些安装说明进行操作。
推荐使用 virtualenv:
python3 -m venv venv
pip install pipelinewise-tap-mysql
或者
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .
有一个源数据库
这个 MySQL 数据库中有一些重要的业务数据孤岛——我们需要提取它。这是我们要同步的表:
mysql> select * from example_db.animals;
+----|----------|----------------------+
| id | name | likes_getting_petted |
+----|----------|----------------------+
| 1 | aardvark | 0 |
| 2 | bear | 0 |
| 3 | cow | 1 |
+----|----------|----------------------+
3 rows in set (0.00 sec)
创建配置文件
创建一个包含数据库连接凭据的配置文件,请参见示例.
配置参数列表:
| 范围 | 类型 | 必需的 | 默认 | 描述 |
|---|---|---|---|---|
| 主持人 | 细绳 | 是的 | - | mysql/mariadb 主机 |
| 港口 | 整数 | 是的 | - | mysql/mariadb 端口 |
| 用户 | 细绳 | 是的 | - | 数据库用户名 |
| 密码 | 细绳 | 是的 | - | 数据库密码 |
| 游标类 | 细绳 | 不 | pymysql.cursors.SSCursor |
设置 PyMYSQL 使用的游标类 |
| 数据库 | 细绳 | 不 | - | 要使用的数据库,None 表示不使用特定的数据库。由 PyMYSQL 使用 |
| server_id | 整数 | 错误的 | 随机生成的 int | 当此分接头连接到服务器时,用作从属 ID |
| 过滤器数据库 | 细绳 | 错误的 | - | 逗号分隔的模式列表,仅从特定模式中提取表并提高数据提取性能 |
| 使用_gtid | 布尔 | 错误的 | 错误的 |
使用 GTID 启用基于日志的复制的标志 |
| 引擎 | 字符串(“mysql”或“mariadb”) | 错误的 | 'mysql' | 指示服务器的风格,用于带有 GTID 的 LOG_BASED |
| ssl | 字符串(“真”) | 不 | 错误的 | 启用 SSL 连接 |
| ssl_ca | 细绳 | 不 | - | 对于自签名 SSL |
| ssl_cert | 细绳 | 不 | - | 对于自签名 SSL |
| ssl_key | 细绳 | 不 | - | 对于自签名 SSL |
| 内部主机名 | 细绳 | 不 | - | 覆盖谷歌云的匹配主机名 |
| session_sqls | 字符串列表 | 不 | ['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600'] |
动态设置会话变量。 |
发现模式
可以在发现模式下调用点击以查找数据库中可用的表和列:
$ tap-mysql --config config.json --discover
输出一个发现的目录,其中包含每个表的 JSON 模式描述。一个源表直接对应一个 Singer 流。
{
"streams": [
{
"tap_stream_id": "example_db-animals",
"table_name": "animals",
"schema": {
"type": "object",
"properties": {
"name": {
"inclusion": "available",
"type": [
"null",
"string"
],
"maxLength": 255
},
"id": {
"inclusion": "automatic",
"minimum": -2147483648,
"maximum": 2147483647,
"type": [
"null",
"integer"
]
},
"likes_getting_petted": {
"inclusion": "available",
"type": [
"null",
"boolean"
]
}
}
},
"metadata": [
{
"breadcrumb": [],
"metadata": {
"row-count": 3,
"table-key-properties": [
"id"
],
"database-name": "example_db",
"selected-by-default": false,
"is-view": false,
}
},
{
"breadcrumb": [
"properties",
"id"
],
"metadata": {
"sql-datatype": "int(11)",
"selected-by-default": true
}
},
{
"breadcrumb": [
"properties",
"name"
],
"metadata": {
"sql-datatype": "varchar(255)",
"selected-by-default": true
}
},
{
"breadcrumb": [
"properties",
"likes_getting_petted"
],
"metadata": {
"sql-datatype": "tinyint(1)",
"selected-by-default": true
}
}
],
"stream": "animals"
}
]
}
字段选择
在同步模式下,使用目录并查找已在其关联元数据条目中tap-mysql标记为选中的表和字段。
将 tap 的发现模式的输出重定向到一个文件,以便对其进行修改:
$ tap-mysql -c config.json --discover > properties.json
然后编辑properties.json以进行选择。在此示例中,我们需要
animals表格。流的元数据条目(与 关联"breadcrumb": [])获得顶级selected标志,其列的元数据条目也是如此。此外,我们将animals使用FULL_TABLE策略标记要复制的表。有关更多信息,请参阅复制方法和状态文件。
[
{
"breadcrumb": [],
"metadata": {
"row-count": 3,
"table-key-properties": [
"id"
],
"database-name": "example_db",
"selected-by-default": false,
"is-view": false,
"selected": true,
"replication-method": "FULL_TABLE"
}
},
{
"breadcrumb": [
"properties",
"id"
],
"metadata": {
"sql-datatype": "int(11)",
"selected-by-default": true,
"selected": true
}
},
{
"breadcrumb": [
"properties",
"name"
],
"metadata": {
"sql-datatype": "varchar(255)",
"selected-by-default": true,
"selected": true
}
},
{
"breadcrumb": [
"properties",
"likes_getting_petted"
],
"metadata": {
"sql-datatype": "tinyint(1)",
"selected-by-default": true,
"selected": true
}
}
]
同步模式
使用描述字段和表选择的属性目录,可以在同步模式下调用点击:
$ tap-mysql -c config.json --properties properties.json
消息按照 Singer 规范写入标准输出。生成的 JSON 数据流可以由 Singer 目标使用。
{"value": {"currently_syncing": "example_db-animals"}, "type": "STATE"}
{"key_properties": ["id"], "stream": "animals", "schema": {"properties": {"name": {"inclusion": "available", "maxLength": 255, "type": ["null", "string"]}, "likes_getting_petted": {"inclusion": "available", "type": ["null", "boolean"]}, "id": {"inclusion": "automatic", "minimum": -2147483648, "type": ["null", "integer"], "maximum": 2147483647}}, "type": "object"}, "type": "SCHEMA"}
{"stream": "animals", "version": 1509133344771, "type": "ACTIVATE_VERSION"}
{"record": {"name": "aardvark", "likes_getting_petted": false, "id": 1}, "stream": "animals", "version": 1509133344771, "type": "RECORD"}
{"record": {"name": "bear", "likes_getting_petted": false, "id": 2}, "stream": "animals", "version": 1509133344771, "type": "RECORD"}
{"record": {"name": "cow", "likes_getting_petted": true, "id": 3}, "stream": "animals", "version": 1509133344771, "type": "RECORD"}
{"stream": "animals", "version": 1509133344771, "type": "ACTIVATE_VERSION"}
{"value": {"currently_syncing": "example_db-animals", "bookmarks": {"example_db-animals": {"initial_full_table_complete": true}}}, "type": "STATE"}
{"value": {"currently_syncing": null, "bookmarks": {"example_db-animals": {"initial_full_table_complete": true}}}, "type": "STATE"}
复制方法和状态文件
在上面的示例中,我们调用tap-mysql时没有提供状态文件,也没有指定复制方法。复制给定表的方法FULL_TABLE是LOG_BASED和INCREMENTAL。
LOG_BASED
LOG_BASED 复制利用服务器的二进制日志(binlogs),这种方法可以与主服务器一起使用,tap 充当副本并请求主服务器流式传输日志事件,然后tap 消耗与行更改有关的事件(插入、更新、删除)、binlog 文件轮换和 gtid 事件。
Log_based 方法总是需要一个初始同步来获取表的快照和当前的 binlog 坐标/gtid 位置。
tap 支持两种消费日志事件的方式:使用 binlog 坐标或 GTID,默认行为是使用 binlog 坐标,在打开use_gtid标志时,您必须指定引擎风格(mariadb/mysql),因为 GTID 实现有很大不同这两个引擎。
开启use_gtidflag且引擎为MariaDB时,tap会根据状态中已有的binlog坐标动态推断GTID pos,如果引擎为mysql,则会失败。
使用 binlog 坐标时的状态
{
"bookmarks": {
"example_db-table1": {"log_file": "mysql-binlog.0003", "log_pos": 3244},
"example_db-table2": {"log_file": "mysql-binlog.0001", "log_pos": 42},
"example_db-table3": {"log_file": "mysql-binlog.0003", "log_pos": 100}
}
}
使用 GTID 时的状态
{
"bookmarks": {
"example_db-table1": {"log_file": "mysql-binlog.0003", "log_pos": 3244, "gtid": "0:364864374:599"},
"example_db-table2": {"log_file": "mysql-binlog.0001", "log_pos": 42, "gtid": "0:364864374:375"},
"example_db-table3": {"log_file": "mysql-binlog.0003", "log_pos": 100, "gtid": "0:364864374:399"}
}
}
全表
每次调用 tap 时,全表复制都会从源表中提取所有数据。
增加的
增量复制与状态文件结合使用,仅在每次调用点击时提取新记录。这也需要在表的元数据中指定复制键。
例子
让我们再次同步animals表,但这次使用增量复制。复制方法和复制键在属性文件中表的元数据条目中设置:
{
"streams": [
{
"tap_stream_id": "example_db-animals",
"table_name": "animals",
"schema": { ... },
"metadata": [
{
"breadcrumb": [],
"metadata": {
"row-count": 3,
"table-key-properties": [
"id"
],
"database-name": "example_db",
"selected-by-default": false,
"is-view": false,
"replication-method": "INCREMENTAL",
"replication-key": "id"
}
},
...
],
"stream": "animals"
}
]
}
到目前为止,我们还没有有意义的状态,所以只需在没有状态文件的情况下再次以同步模式调用 tap:
$ tap-mysql -c config.json --properties properties.json
FULL_TABLE输出消息看起来与使用默认复制方法复制表时非常相似。一个重要的区别是
STATE消息现在包含一个replication_key_value书签或高水位标记,用于提取的数据:
{ “类型” :“状态” ,“值” :{ “当前同步” :“example_db-animals” }}
{ “流” :“动物” ,“类型” :“SCHEMA” ,“模式” :{ “类型” :“对象” ,“属性” :{ “id” :{ “类型” :[ “空” ,“整数” ],“最小值” :-2147483648 ,“最大值” :2147483647 ,“包含” :“自动” },“名称” :{ “类型” :[ “空” ,“字符串” ],“包含” :“可用” ,“maxLength” :255 },“likes_getting_petted” :{ “类型” :[ “null” ,“布尔” ],“包含” :“可用” }}},“key_properties” :[ “身份证” ]}
{ “流” :“动物” ,“类型” :“ACTIVATE_VERSION” ,“版本” :1509135204169 }
{ “流” :“动物” ,“类型” :“记录” ,“版本” :1509135204169 ,“记录” :{ “id” :1 ,“名称” :“土豚” ,“likes_getting_petted” :假}}
{ “流” :“动物” ,“类型” :“记录” ,“版本” :1509135204169 ,