kafka ksql && docker 安装试用
docker 的安装不需要进行多描述了,直接yum 或者源码编译也可以
git clone :confluentinc/ksql.git cd ksql/docs/quickstart/ docker-compose up -d
docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092
创建测试使用的表 CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED'); DESCRIBE pageviews_original; CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON'); DESCRIBE users_original; 显示系统的stream SHOW STREAMS; Stream Name | Kafka Topic | Format ---------------------------------------------- PAGEVIEWS_ORIGINAL | pageviews | DELIMITED 显示系统的表 SHOW TABLES; Table Name | Kafka Topic | Format | Windowed -------------------------------------------------- USERS_ORIGINAL | users | JSON | false 查询数据 SELECT pageid FROM pageviews_original LIMIT 3; Page_66 Page_17 Page_67 LIMIT reached for the partition. Query terminated
创建数据 CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE'; DESCRIBE pageviews_female; Field | Type ---------------------------- ROWTIME | BIGINT ROWKEY | VARCHAR(STRING) USERID | VARCHAR(STRING) PAGEID | VARCHAR(STRING) REGIONID | VARCHAR(STRING) GENDER | VARCHAR(STRING) 查询数据 SELECT * FROM pageviews_female; 1504252783201 | User_5 | User_5 | Page_49 | Region_8 | FEMALE 1504252783525 | User_6 | User_6 | Page_39 | Region_6 | FEMALE 1504252783813 | User_5 | User_5 | Page_15 | Region_8 | FEMALE 1504252789309 | User_6 | User_6 | Page_90 | Region_5 | FEMALE 1504252792424 | User_8 | User_8 | Page_40 | Region_1 | FEMALE 1504252796605 | User_4 | User_4 | Page_12 | Region_8 | FEMALE 1504252797405 | User_3 | User_3 | Page_22 | Region_3 | FEMALE 1504252802099 | User_6 | User_6 | Page_43 | Region_7 | FEMALE
stream、table、like 、 join、limit、tumbling window、简单聚合函数,目前来说还是比较强大的