PostgreSQL
PostgreSQL Databases
00.体系结构
01.install PostgreSQL 14 for Centos 8.x/7.x
02.PostgreSQL15 Beta1版本在CentOS-Stream-9下的编译安装
03.PostgreSQL 配置文件
0301.postmaster.pid 文件内容分析
0302.postgresql.conf & postgresql.auto.conf
0303.pg_hba.conf
0304.pg_ident.conf
0305.postmaster.opts & PG_VERSION
04.contrib 模块
05.contrib中的dblink插件
06.data storage
07.pageinspect
08.psql
09.PL/pgSQL 函数的三态
10.Index
11.vacuum
12.全文检索
13.PG数据库查询执行
pgquarrel
The Connection Service File
Environment Variables
The Password File
recovery.conf
checkpoint
full_page_write
user/role 对象使用
14.Connection pooling and database caching
pg_hba.conf
PostgreSQL 日志管理
lock
系统字段与多版本并发控制
事务处理与并发控制
分区和继承
partitioning table
INHERITS PARENT TABLE
PostgreSQL14 版本在CentOS-7.6 下的编译安装
从Oracle到PostgreSQL的控制文件
事件触发器
Postgtres OID & CTID
Cluster Management
PostgreSQL 最佳群集高可用性方案
基于patroni+etcd打造可自动故障转移的PostgreSQL集群
PL/Proxy
Citus
01.Single-Node Citus
02.Multi-Node Citus
03.Multi-tenant Applications
04.Real-time Analytics
05.Cluster Management
06.Table Management
00.编译安装 citus
Postgres-XL
01.从源代码编译并安装
02.Postgres-XL 集群动态扩容
03.Postgres-XL性能优化
04.使用haproxy读写负载均衡
Migration
Ora2Pg
mysql2pgsql
Replication
0101.Log-Shipping Standby Servers
0102.Failover
0103.Hot Standby
0104.Asynchronous streaming replication
0105.synchronous streaming replication
0106.replication slot
0107.pg_rewind 修复时间线
0108.Streaming Replication 流复制主备延迟测试
0109.自动主从切换
0201.Logical Replication
0202.Quick Setup
0301.base on trigger -- PostgreSQL general sync and async multi-master replication trigger function
0302.视图和物化视图
Skytools
PGQ
Londiste Tutorial (Skytools 2)
walmgr
Two-node replication mode
londiste 安装部署
Cascade copy mode
Merge replication mode
Setting up Londiste3 replication to partitions
0110.Asynchronous streaming replication to synchronous streaming replication
Monitoring
Postgres Enterprise Manager
归档及日志清理
01.Database Statistics
02.OS level statistics
03.日常维护和检查
Backup and Recovery
EDB BART
02.Barman
崩溃恢复
pg_basebackup
pg_probackup
PgBackRest
Integrations
oracle_fdw
clickhousedb_fdw
pgbench
PostgreSQL 服务器应用
ERROR
使用 pgrpms 制作 postgresql14 rpm 安装包
Highly available
repmgr
Patroni
Greenplum+Kafka 实时数据处理
本文档使用 MrDoc 发布
-
+
home page
Greenplum+Kafka 实时数据处理
# 1.Greenplum+Kafka 从 kafka 同步数据到 greenplum 有两种方式: - gpss 启动服务,用 gpsscli 向 gpss 注册 kafka 加载作业(重点介绍) - 用 gpkafka 组件来快速完成上面的步骤,因为 gpkafka 封装了 gpss 和 gpsscli 的功能 greenplum对接一下kafka,参考官方资料: https://gpdb.docs.pivotal.io/5180/greenplum-kafka/load-from-kafka-example.html  # 2.安装 ## 2.1.安装kafka环境 ## 2.2.安装 gpss step 1.安装 gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg ``` gppkg -i /home/gpkafka/gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg ========================================================================== GPSS installation is complete! To proceed, create gpss extension in the target database with: "CREATE EXTENSION gpss;" ========================================================================== 20220120:08:45:44:008599 gppkg:tcloud:gpadmin-[INFO]:-gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg successfully installed. ``` step 2.安装扩展 ``` [gpadmin@tcloud ~]$ psql psql (9.4.24) Type "help" for help. # 切换数据库 gpdb=# \c datacenter # 安装 gpss datacenter=# CREATE EXTENSION gpss; ``` 报错 1️⃣ ``` ERROR: could not open extension control file "/usr/local/greenplum-db-6.13.0/share/postgresql/extension/gpss.control": No such file or directory ``` 在报错文件夹/usr/local/greenplum-db-6.13.0/share/postgresql/extension/下添加两个文件 gpss.control 和 gpss--1.0.sql。 ``` # 添加 gpss.control 和 gpss--1.0.sql 后重新安装 datacenter=# CREATE EXTENSION gpss; ``` 报错 2️⃣ ``` ERROR: could not access file "$libdir/gpfmt_gpss.so": No such file or directory ``` 在文件夹/usr/local/greenplum-db-6.13.0/lib/postgresql下添加三个文件 gpfmt_gpss.so 、gpfmt_protobuf.so 和 gpss.so 这三个文件在 gpss--1.0.sql 内用到了。 ``` # 添加三个文件 gpfmt_gpss.so、gpfmt_protobuf.so 和 gpss.so 后重新安装 datacenter=# CREATE EXTENSION gpss; CREATE EXTENSION ``` # 3.基于Greenplum+Kafka的实时数据处理 step 1.启动kafka ``` # 启动zookeeper bin/zkServer.sh start # 启动kafka bin/kafka-server-start.sh -daemon ../config/server.properties ``` step 2.创建gpss扩展 在将Kafka消息数据加载到Greenplum数据库之前,必须在将Kafka数据写入Greenplum表的每个数据库中注册Greenplum-Kafka集成格式化程序函数 ``` [gpadmin@oracle166 ~]$ psql psql (8.3.23) Type "help" for help. test=# CREATE EXTENSION gpss; ``` step 3.创建示例表 kafka的数据格式json形式;样式: ``` {"time":1550198435941,"type":"type_mobileinfo","phone_imei":"861738033581011","phone_imsi":"","phone_mac":"00:27:1c:95:47:09","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2","phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9","appUdid":"D21C76419E54B18DDBB94BF2E6990183","phone_resolution":"1280*720","phone_apn":"","phone_model":"BF T26","phone_firmware_version":"5.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_13621_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:00:27:1c:95:47:09"} ``` 使用其中的 package_name,appkey ,time, phone_udid,os, idfa,phone_imei,cpid,last_cpid,phone_number 字段 ``` CREATE TABLE tbl_novel_mobile_log ( package_name text, appkey text, ts bigint, phone_udid text, os character varying(20), idfa character varying(64), phone_imei character varying(20), cpid text, last_cpid text, phone_number character varying(20) ) ; ``` step 4.创建gpkafka.yaml配置文件 ``` gpkafka_mobile_yaml文件内容: DATABASE: lottu USER: gpadmin HOST: oracle166 PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: kafkaip:9092 TOPIC: mobile_info COLUMNS: - NAME: jdata TYPE: json FORMAT: json ERROR_LIMIT: 10 OUTPUT: TABLE: tbl_novel_mobile_log MAPPING: - NAME: package_name EXPRESSION: (jdata->>'package_name')::text - NAME: appkey EXPRESSION: (jdata->>'appkey')::text - NAME: ts EXPRESSION: (jdata->>'time')::bigint - NAME: phone_udid EXPRESSION: (jdata->>'phone_udid')::text - NAME: os EXPRESSION: (jdata->>'os')::text - NAME: idfa EXPRESSION: (jdata->>'idfa')::text - NAME: phone_imei EXPRESSION: (jdata->>'phone_imei')::text - NAME: cpid EXPRESSION: (jdata->>'cpid')::text - NAME: last_cpid EXPRESSION: (jdata->>'last_cpid')::text - NAME: phone_number EXPRESSION: (jdata->>'phone_number')::text COMMIT: MAX_ROW: 1000 ``` step 5.创建 topic ``` bin/kafka-topics.sh --create --zookeeper kafkaIp:2181 --replication-factor 1 --partitions 1 --topic mobile_info ``` step 6.创建kafka的发布者,并添加kafka记录 ``` bin/kafka-console-producer.sh --broker-list kafkaIP:9092 --topic mobile_info >{"time":1550198435941,"type":"type_mobileinfo","phone_imei":"861738033581011","phone_imsi":"","phone_mac":"00:27:1c:95:47:09","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2","phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9","appUdid":"D21C76419E54B18DDBB94BF2E6990183","phone_resolution":"1280*720","phone_apn":"","phone_model":"BF T26","phone_firmware_version":"5.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_13621_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:00:27:1c:95:47:09"} {"time":1550198437885,"type":"type_mobileinfo","phone_imei":"862245038046551","phone_imsi":"","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626F2F76646B74606F2F736460656473","phone_udid":"A3BB70A0218AEFC7908B1D79C0C02D77","phone_udid2":"E3976E0453010FC7F32B6143AA3A164E","appUdid":"4FBEF77BC076254ED0407CAD653E6954","phone_resolution":"1920*1080","phone_apn":"","phone_model":"Le X620","phone_firmware_version":"6.0","phone_softversion":"1.9.0","phone_softname":"cn.wejuan.reader","sdk_version":"3.1.8","cpid":"blf1298_14411_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/cn.wejuan.reader-1\/base.apk","last_cpid":"","package_name":"cn.wejuan.reader","src_code":"ffffffff-9063-8e34-0000-00007efffeff"} {"time":1550198438311,"type":"type_mobileinfo","phone_number":"","phone_imei":"867520045576831","phone_imsi":"460001122544742","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"A00407EF9D6EBCC207A514CDA452EB76","phone_udid2":"A00407EF9D6EBCC207A514CDA452EB76","appUdid":"1C35633F4EB8218789EFD8666C763485","phone_resolution":"2086*1080","phone_apn":"CMCC","phone_model":"ONEPLUS A6000","phone_firmware_version":"9","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-TlgFCk6ANgEDRnXDCem8uQ==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460001122544742"} {"time":1550198433102,"type":"type_mobileinfo","phone_number":"15077113477","phone_imei":"860364049874919","phone_imsi":"460023771256711","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"EEF566CB5253AA62B653347A203815C3","phone_udid2":"0845931539AE39B3B0D4EB42B85D98EC","appUdid":"9570DCA2D574E6B69B24137035209D42","phone_resolution":"2340*1080","phone_apn":"CHINA MOBILE","phone_model":"PBEM00","phone_firmware_version":"8.1.0","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-NBToXQo14TOeNuPxo_aA4w==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"13598c2d-efc4-4957-8d4d-22eb145d15fd"} {"time":1550198440577,"type":"type_mobileinfo","phone_imei":"869800021106037","phone_imsi":"","phone_mac":"2c:5b:b8:fb:79:af","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"2BC16C4AC07070BA9608BBD0EE2EE320","phone_udid2":"A7F9FA4772D31FADEECFDB445BA3BEBB","appUdid":"DC6BEE2F6E5D6A133E26131887AE788A","phone_resolution":"960*540","phone_apn":"","phone_model":"OPPO A33","phone_firmware_version":"5.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_14526_003","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:2c:5b:b8:fb:79:af"} {"time":1506944701166,"type":"type_mobileinfo","phone_number":"+8618602699126","phone_imei":"865902038154143","phone_imsi":"460012690618403","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"388015DA70C0AEA6D59D3CE37B0C4BA2","phone_udid2":"388015DA70C0AEA6D59D3CE37B0C4BA2","appUdid":"EC0A105297D55075526018078A4A1B84","phone_resolution":"1920*1080","phone_apn":"中国联通","phone_model":"MI MAX 2","phone_firmware_version":"7.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_10928_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460012690618403"} ``` step 7.执行gpkafka加载数据 ``` [gpadmin@oracle166 ~]$ gpkafka load --quit-at-eof ./gpkafka_mobile_yaml PartitionID StartTime EndTime BeginOffset EndOffset 0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5 Job dcd0d159282c0ef39f182cabeef23ee6 stopped normally at 2019-02-27 09:26:29.442874281 +0000 UTC ``` step 8.检查加载操作的进度(非必要) ``` [gpadmin@oracle166 ~]$ gpkafka check ./gpkafka_mobile_yaml PartitionID StartTime EndTime BeginOffset EndOffset 0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5 ``` step 9.查看表数据 ``` [gpadmin@oracle166 ~]$ psql psql (8.3.23) Type "help" for help. lottu=# select * from tbl_novel_mobile_log ; package_name | appkey | ts | phone_udid | os | idfa | phone_imei | cpid | last_cpid | p hone_number -------------------+------------------------------------------+---------------+----------------------------------+---------+------+-----------------+-------------------+-----------+-- ------------ com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198435941 | 8F137BFFB2289784A5EA2DCADCE519C2 | android | | 861738033581011 | blp1375_13621_001 | | com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198438311 | A00407EF9D6EBCC207A514CDA452EB76 | android | | 867520045576831 | blf1298_12242_001 | | com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198433102 | EEF566CB5253AA62B653347A203815C3 | android | | 860364049874919 | blf1298_12242_001 | | 1 5077113477 com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198440577 | 2BC16C4AC07070BA9608BBD0EE2EE320 | android | | 869800021106037 | blp1375_14526_003 | | cn.wejuan.reader | 307A5C626F2F76646B74606F2F736460656473 | 1550198437885 | A3BB70A0218AEFC7908B1D79C0C02D77 | android | | 862245038046551 | blf1298_14411_001 | | (5 rows) ``` # 4.用 gpss 从 kafka 消费数据加载到 greenplum ## 4.1.创建配置 step 1.准备 kafka 生产者和消费者 ``` # kafka 生产者程序: kafka-console-producer.sh --broker-list 192.168.12.115:7776 --topic gpss_test # kafka 消费者程序: kafka-console-consumer.sh --bootstrap-server 192.168.12.115:7776 --topic gpss_test --from-beginning ``` step 2.配置 gpss 服务的 host 和 port ``` gpss4ic.json { "ListenAddress": { "Host": "", "Port": 50007 }, "Gpfdist": { "Host": "", "Port": 8319, "ReuseTables": false } } ``` step 3.用于加载 kafka 数据到 greenplum 的配置文件 1. 加载以”|” 分割的流数据的配置文件 kafka_testdata_delimited.yaml ``` DATABASE: yloms USER: gpss_usr PASSWORD: gpss_usr HOST: mdw PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: 192.168.12.115:7776 TOPIC: gpss_test VALUE: COLUMNS: - NAME: tid TYPE: integer - NAME: tcode TYPE: varchar - NAME: tname TYPE: varchar FORMAT: delimited DELIMITED_OPTION: DELIMITER: "|" ERROR_LIMIT: 25 OUTPUT: SCHEMA: ylorder TABLE: test_heap METADATA: SCHEMA: ylorder COMMIT: MINIMAL_INTERVAL: 2000 POLL: BATCHSIZE: 100 TIMEOUT: 3000 ``` 2. 加载 JSON 格式流数据的配置文件 kafka_testdata_json.yaml ``` DATABASE: yloms USER: gpss_usr PASSWORD: gpss_usr HOST: mdw PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: 192.168.12.115:7776 TOPIC: gpss_test VALUE: COLUMNS: - NAME: jdata TYPE: json FORMAT: json ERROR_LIMIT: 25 OUTPUT: SCHEMA: ylorder TABLE: test_heap MAPPING: - NAME: tid EXPRESSION: (jdata->>'tid')::int - NAME: tcode EXPRESSION: (jdata->>'tcode')::varchar - NAME: tname EXPRESSION: (jdata->>'tname')::varchar METADATA: SCHEMA: ylorder COMMIT: MINIMAL_INTERVAL: 2000 POLL: BATCHSIZE: 100 TIMEOUT: 3000 ``` ## 4.2.启动作业 step 1.用 gpss 做 etl 加载: 启动 gpss 服务: ``` gpss gpss4ic.json ``` 日志输出 ``` 20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-using config file: gpss4ic.json 20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-config file content: { "ListenAddress": { "Host": "mdw", "Port": 50007, "Certificate": { "CertFile": "", "KeyFile": "", "CAFile": "" } }, "Gpfdist": { "Host": "mdw", "Port": 8319, "ReuseTables": false, "Certificate": { "CertFile": "", "KeyFile": "", "CAFile": "" }, "BindAddress": "0.0.0.0" } } 20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss-listen-address-prefix: mdw:50007 20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss will use random external table name, external table won't get reused 20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpfdist listening on 0.0.0.0:8319 20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss listening on mdw:50007 ``` step 2.提交一个作业: ``` gpsscli submit --name kafkajson2gp --gpss-port 50007 --gpss-host mdw ./kafka_testdata_json.yaml ``` 输出如下 ``` 20200225:22:09:16 gpsscli:gpadmin:greenplum-001:010722-[INFO]:-JobID: kafkajson2gp ``` step 3.查看作业列表: ``` gpsscli list --all --gpss-port 50007 --gpss-host mdw ``` 输出如下 ``` JobID GPHost GPPort DataBase Schema Table Topic Status kafkajson2gp mdw 5432 yloms ylorder test_heap gpss_test JOB_STOPPED ``` step 4.启动作业: ``` gpsscli start kafkajson2gp --gpss-port 50007 --gpss-host mdw ``` 输出如下 ``` 20200225:22:10:24 gpsscli:gpadmin:greenplum-001:010756-[INFO]:-JobID: kafkajson2gp is started ``` step 5.查看作业: ``` gpsscli list --all --gpss-port 50007 --gpss-host mdw ``` 输出如下 ``` JobID GPHost GPPort DataBase Schema Table Topic Status kafkajson2gp mdw 5432 yloms ylorder test_heap gpss_test JOB_RUNNING ``` step 6.停掉作业: ``` gpsscli stop kafkajson2gp --gpss-port 50007 --gpss-host mdw ``` 输出如下 ``` 20200225:22:11:04 gpsscli:gpadmin:greenplum-001:010801-[INFO]:-Stop a job: kafkajson2gp, status JOB_STOPPED ``` ## 4.3.用 gpkafka 启动服务: > gpkafka load 可以理解为代替了 gpsscli 上的提交作业,启动作业等命令。 ``` gpkafka --config gpss4ic.json load kafka_testdata_json.yaml ``` 查看数据库保存的偏移量 ``` select * from kafka_test.gpkafka_data_from_kafka_12ead185469b45cc8e5be3c9f0ea14a2 limit 10; ``` # 5.实时数据同步 ## 5.1.maxwell + Kafka + bireme 采用 maxwell + Kafka + bireme,将MySQL数据实时同步至Greenplum。 maxwell实时解析MySQL的binlog,并将输出的JSON格式数据发送到Kafka,Kafka在此方案中主要用于消息中转,bireme负责读取Kafka的消息,并应用于Greenplum数据库以增量同步数据。  maxwell是一个能实时读取MySQL二进制日志binlog,并生成JSON 格式的消息,作为生产者发送给Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其他平台的应用程序,其中Kafka是maxwell支持最完善的一个消息系统。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。 maswell在GitHub上具有较高的活跃度,官网地址为地址为https://github.com/zendesk/maxwell。 maxwell主要提供了下列功能: 1. 支持 SELECT * FROM table 方式进行全量数据初始化。 1. 支持GTID,当MySQL发生failover后,自动恢复binlog位置。 1. 可以对数据进行分区,解决数据倾斜问题,发送到Kafka的数据支持database、table、column等级别的数据分区。 1. 工作方式是伪装为MySQL Slave,在主库上创建dump线程连接,接收binlog事件,然后根据schemas信息拼装成JSON字符串,可以接受ddl、xid、row等各种事件。 bireme是一个Greenplum数据仓库的增量同步工具,目前支持MySQL、PostgreSQL和MongoDB数据源,maxwell + Kafka 是一种支持的数据源类型。 bireme作为Kafka的消费者,采用 DELETE + COPY 的方式,将数据源的修改记录同步到Greenplum,相较于INSERT、UPDATE、DELETE方式,COPY方式速度更快,性能更优。bireme的主要特性是采用小批量加载方式(默认加载延迟时间为10秒钟)提升数据同步的性能,但要求所有同步表在源和目标数据库中都必须有主键。bireme 官网地址为https://github.com/HashDataInc/bireme/。 Kafka在本架构中作为消息中间件将maxwell和bireme桥接在一起,上下游组件的实现都依赖于它。 ## 5.2.Canal + Kafka + ClientAdapter  Canal是阿里开源的一个的组件,无论功能还是实现上都与maxwell类似。其主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费,工作原理相对比较简单: 1. Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议。 1. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 Canal )。 1. Canal 解析 binary log 对象(原始字节流)。 Canal Server代表一个Canal运行实例,对应于一个jvm。Instance对应于一个数据队列,1个Server对应1..n个Instance。Instance模块中,EventParser完成数据源接入,模拟slave与master进行交互并解析协议。EventSink是Parser和Store的连接器,进行数据过滤、加工与分发。EventStore负责存储数据。MetaManager是增量订阅与消费信息管理器。  Canal 1.1.1版本之后默认支持将Canal Server接收到的binlog数据直接投递到消息队列,目前默认支持的消息系统有Kafka和RocketMQ。早期的Canal仅提供Client API,需要用户自己编写客户端程序实现消费逻辑。Canal 1.1.1版本之后增加了client-adapter,提供客户端数据落地的适配及启动功能。
Seven
Oct. 17, 2022, 1:10 p.m.
转发文档
Collection documents
Last
Next
手机扫码
Copy link
手机扫一扫转发分享
Copy link
Markdown文件
share
link
type
password
Update password