searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

kafka数据同步到mysql

2023-09-25 08:44:50
11
0

kafka安装

使用docker-compose进行安装,docker-compose文件如下:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      proxy:
        ipv4_address: 172.16.0.8
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.183.142
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    networks:
      proxy:
        ipv4_address: 172.16.0.9
networks:
  proxy:
    ipam:
      config:
        - subnet: 172.16.0.0/24

这样安装的kakfa是没有密码的,下面为kafka配置密码

先将kafka的配置文件映射到本机目录

docker cp 277:/opt/kafka/config /root/docker-build/kafka/config/
docker cp 277:/opt/kafka/bin /root/docker-build/kafka/bin/
添加密码

然后将容器删除 

docker-compose down

修改config目录下的server.properties

############################# Server Basics #############################
broker.id=-1
listeners=SASL_PLAINTEXT://192.168.183.137:9092
advertised.listeners=SASL_PLAINTEXT://192.168.183.137:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

修改bin目录下的kafka-server-start.sh文件,修改如下

重新启动kafka,修改docker-compose.yml文件如下

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      proxy:
        ipv4_address: 172.16.0.8
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.183.142
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
        - ./config:/opt/kafka/config
        - ./bin:/opt/kafka/bin
    networks:
      proxy:
        ipv4_address: 172.16.0.9
networks:
  proxy:
    ipam:
      config:
        - subnet: 172.16.0.0/24

启动容器 

docker-compose up -d

这样把kafka启动起来

测试步骤

在sp中启动任务

sql脚本

create table goods_source (
  goods_id int,
  goods_price decimal(8,2),
  goods_name varchar,
  goods_details varchar
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '101.43.164.4:9092',
  'topic' = 'test_kafka',
  'properties.group.id' = 'test-consumer-group-1',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="******";',
  'scan.startup.mode' =  'earliest-offset',
  'format' =  'json'

);
create table goods_target (
  goods_id int,
  goods_price decimal(8,2),
  goods_name varchar,
  goods_details varchar,
  PRIMARY KEY (`goods_id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://101.43.164.4:3306/cdc-sink?useSSL=false&characterEncoding=utf-8',
  'table-name' = 'my_goods_kafka',
  'username' = 'root',
  'password' = '******'
);
insert into
  goods_target
select
  *
from
  goods_source;

然后写代码,推送十条数据

这里是java写的推送,仅供参考

@Test
    public void test1() throws ExecutionException, InterruptedException {
        for (int i = 10; i <= 20; i++) {
            CdcTestGoods cdcTestGoods = new CdcTestGoods();
            cdcTestGoods.setGoods_id(5 + i);
            cdcTestGoods.setGoods_name("iphone 14 pro max 128G  " + i);
            cdcTestGoods.setGoods_details("京东618大降价,买到就是赚  " + i);
            cdcTestGoods.setGoods_price(5899f);
            SendResult<String, String> result = kafkaTemplate.send("test_kafka", JacksonUtils.getString(cdcTestGoods)).get();
            log.info("sendMessageSync =>  {},message: {}", result, JacksonUtils.getString(cdcTestGoods));
        }
    }

查看mysql表,出现相关内容,kafaka只支持insert  不支持update

 

0条评论
0 / 1000
刘****猛
7文章数
0粉丝数
刘****猛
7 文章 | 0 粉丝
原创

kafka数据同步到mysql

2023-09-25 08:44:50
11
0

kafka安装

使用docker-compose进行安装,docker-compose文件如下:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      proxy:
        ipv4_address: 172.16.0.8
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.183.142
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    networks:
      proxy:
        ipv4_address: 172.16.0.9
networks:
  proxy:
    ipam:
      config:
        - subnet: 172.16.0.0/24

这样安装的kakfa是没有密码的,下面为kafka配置密码

先将kafka的配置文件映射到本机目录

docker cp 277:/opt/kafka/config /root/docker-build/kafka/config/
docker cp 277:/opt/kafka/bin /root/docker-build/kafka/bin/
添加密码

然后将容器删除 

docker-compose down

修改config目录下的server.properties

############################# Server Basics #############################
broker.id=-1
listeners=SASL_PLAINTEXT://192.168.183.137:9092
advertised.listeners=SASL_PLAINTEXT://192.168.183.137:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

修改bin目录下的kafka-server-start.sh文件,修改如下

重新启动kafka,修改docker-compose.yml文件如下

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      proxy:
        ipv4_address: 172.16.0.8
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.183.142
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
        - ./config:/opt/kafka/config
        - ./bin:/opt/kafka/bin
    networks:
      proxy:
        ipv4_address: 172.16.0.9
networks:
  proxy:
    ipam:
      config:
        - subnet: 172.16.0.0/24

启动容器 

docker-compose up -d

这样把kafka启动起来

测试步骤

在sp中启动任务

sql脚本

create table goods_source (
  goods_id int,
  goods_price decimal(8,2),
  goods_name varchar,
  goods_details varchar
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '101.43.164.4:9092',
  'topic' = 'test_kafka',
  'properties.group.id' = 'test-consumer-group-1',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="******";',
  'scan.startup.mode' =  'earliest-offset',
  'format' =  'json'

);
create table goods_target (
  goods_id int,
  goods_price decimal(8,2),
  goods_name varchar,
  goods_details varchar,
  PRIMARY KEY (`goods_id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://101.43.164.4:3306/cdc-sink?useSSL=false&characterEncoding=utf-8',
  'table-name' = 'my_goods_kafka',
  'username' = 'root',
  'password' = '******'
);
insert into
  goods_target
select
  *
from
  goods_source;

然后写代码,推送十条数据

这里是java写的推送,仅供参考

@Test
    public void test1() throws ExecutionException, InterruptedException {
        for (int i = 10; i <= 20; i++) {
            CdcTestGoods cdcTestGoods = new CdcTestGoods();
            cdcTestGoods.setGoods_id(5 + i);
            cdcTestGoods.setGoods_name("iphone 14 pro max 128G  " + i);
            cdcTestGoods.setGoods_details("京东618大降价,买到就是赚  " + i);
            cdcTestGoods.setGoods_price(5899f);
            SendResult<String, String> result = kafkaTemplate.send("test_kafka", JacksonUtils.getString(cdcTestGoods)).get();
            log.info("sendMessageSync =>  {},message: {}", result, JacksonUtils.getString(cdcTestGoods));
        }
    }

查看mysql表,出现相关内容,kafaka只支持insert  不支持update

 

文章来自个人专栏
个人文章
7 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0