将数据载入到 Kafka
现在让我们为我们的主题运行一个生成器(producer),然后向主题中发送一些数据!
在你的 Druid 目录中,运行下面的命令:
cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json
在你的 Kafka 的安装目录中,运行下面的命令。请将 {PATH_TO_DRUID} 替换为 Druid 的安装目录:
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
上面的控制台命令将会把示例消息载入到 Kafka 的 wikipedia 主题。 现在我们将会使用 Druid 的 Kafka 索引服务(indexing service)来将我们加载到 Kafka 中的消息导入到 Druid 中。
使用数据加载器(data loader)来加载数据
在 URL 中导航到 localhost:8888 页面,然后在控制台的顶部单击Load data
。
选择 Apache Kafka
然后单击 Connect data
。
输入 Kafka 的服务器地址为 localhost:9092
然后选择 wikipedia
为主题。
然后单击 Apply
。请确定你在界面中看到的数据只正确的。
一旦数据被载入后,你可以单击按钮 “Next: Parse data” 来进行下一步的操作。
Druid 的数据加载器将会为需要加载的数据确定正确的处理器。 在本用例中,我们成功的确定了需要处理的数据格式为 json
格式。 你可以在本页面中选择不同的数据处理器,通过选择不同的数据处理器,能够帮你更好的了解 Druid 是如何帮助你处理数据的。
当 json
格式的数据处理器被选择后,单击 Next: Parse time
来进行入下一个界面,在这个界面中你需要确定 timestamp 主键字段的的列。
Druid 要求所有数据必须有一个 timestamp 的主键字段(这个主键字段被定义和存储在 __time
)中。 如果你需要导入的数据没有时间字段的话,那么请选择 Constant value
。 在我们现在的示例中,数据载入器确定 time
字段是唯一可以被用来作为数据时间字段的数据。
单击 Next: ...
2 次,来跳过 Transform
和 Filter
步骤。 针对本教程来说,你并不需要对导入时间进行换行,所以你不需要调整 转换(Transform) 和 过滤器(Filter) 的配置。
配置摘要(schema) 是你对 dimensions 和 metrics 在导入数据的时候配置的地方。 这个界面显示的是当我们对数据在 Druid 中进行导入的时候,数据是如何在 Druid 中进行存储和表现的。 因为我们提交的数据集非常小,因此我们可以关闭 回滚(rollup) ,Rollup 的开关将不会在这个时候显示来供你选择。
如果你对当前的配置满意的话,单击 Next
来进入 Partition
步骤。在这个步骤中你可以定义数据是如何在段中进行分区的。
在这一步中,你可以调整你的数据是如何在段中进行分配的。 因为当前的数据集是一个非常小的数据库,我们在这一步不需要进行调制。
单击 Next: Tune
来进入性能配置页面。
Tune
这一步中一个 非常重要 的参数是 Use earliest offset
设置为 True
。 因为我们希望从流的开始来读取数据。 针对其他的配置,我们不需要进行修改,单击 Next: Publish
来进入 Publish
步骤。
让我们将数据源命名为 wikipedia-kafka
。
最后,单击 Next
来查看你的配置。
等到这一步的时候,你就可以看到如何使用数据导入来创建一个数据导入规范。 你可以随意的通过页面中的导航返回到前面的页面中对配置进行调整。 简单来说你可以对特性目录进行编辑,来查看编辑后的配置是如何对前面的步骤产生影响的。
当你对所有的配置都满意并且觉得没有问题的时候,单击 提交(Submit).
现在你需要到界面下半部分的任务视图(task view)中来查看通过 supervisor 创建的任务。
任务视图(task view)是被设置为自动刷新的,请等候 supervisor 来运行一个任务。
当一个任务启动运行后,这个任务将会对数据进行处理后导入到 Druid 中。
在页面的顶部,请导航到 Datasources
视图。
当 wikipedia-kafka
数据源成功显示,这个数据源中的数据就可以进行查询了。
请注意: 如果数据源在经过一段时间的等待后还是没有数据的话,那么很有可能是你的 supervisor 没有设置从 Kafka 的开头读取流数据(Tune
步骤中的配置)。
在数据源完成所有的数据导入后,你可以进入 Query
视图,来针对导入的数据源来运行 SQL 查询。
因为我们当前导入的数据库很小,你可以直接运行SELECT * FROM "wikipedia-kafka"
查询来查看数据导入的结果。
请访问 query tutorial 页面中的内容来了解如何针对一个新载入的数据如何运行查询。