searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

spark实践之使用TransportClient往ES写入数据

2023-10-31 02:20:15
36
0

问题导读:
1、如何使用spark连接ES?
2、如何使用TransportClient往ES批量导入数据?
3、在编写代码中踩了哪些坑?
4、ES中如何创建索引?


他们之前把数据导入ES是通过单机的程序导的,或者通过logstash从kafka往ES导,但当数据量很大的时候就会变得很低效,我这两天调研了一下把数据从hdfs直接通过spark导入ES的方法,当然,也适合spark Streaming程序;
这里指出版本号是有必要的,spark版本:1.6.2 ES版本:5.2.1,由于ES的API变动比较频繁,因此最好参考官网文档。

连接ES的方法列举

  •     ES官网中给出了一个与spark连接的方法:是通过RDD可以直接调用 saveToEs 方法实现的;
  •     如果数据量不大的话,可以参考ES提供的RestFulAPI来实现;
  •     本文主要说明我使用的方法,通过 TransportClient 和 bulk 批处理操作来实现,这种方法比较适合数据量很大的情况,又可以灵活处理。


使用TransportClient往ES批量导入的方法

样例代码如下:   

[mw_shl_code=java,true]import java.net.InetAddress
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.action.bulk.{BulkRequestBuilder, BulkResponse}
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClient
/**
  * Author: wangxiaogang
  * Date: 2017/7/11
  * Email: Adamyuanyuan@gmail.com
  * hdfs 中的数据根据格式写到ES中
  */
object HdfsToEs {
  def main(args: Array[String]) {
    if (args.length < 5) {
      System.err.println("Usage: HdfsToEs <file> <esIndex> <esType> <partition>")
      System.exit(1)
    }
    val hdfsInputPath: String = args(0)
    println("hdfsInputPath: " + hdfsInputPath)
    val conf = new SparkConf().setAppName("HdfsToEs")
    val sc = new SparkContext(conf)
    //插入相关,索引 类型 id相关  以args方式提供接口。
    val esIndex: String = args(1)
    val esType: String = args(2)
    val partition: Int = args(3).toInt
    val bulkNum: Int = args(4).toInt
    val hdfsRdd: RDD[String] = sc.textFile(hdfsInputPath, partition)
    val startTime: Long = System.currentTimeMillis
    println("hdfsRDD partition: " + hdfsRdd.getNumPartitions + " setted partition: " + partition)
    hdfsRdd.foreachPartition {
      eachPa => {
        //        生产环境
        val settings: Settings = Settings.builder.put("cluster.name", "production-es").put("client.transport.sniff", true)
          .put("transport.type", "netty3").put("http.type", "netty3").build
        val client: TransportClient = new PreBuiltTransportClient(settings)
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
        var bulkRequest: BulkRequestBuilder = null
        var flag = true
        var lineNum = 0
        for (eachLine <- eachPa) {
          // 每个bulk是10-15M为宜,数据封装为bulk后会较原来的数据略有增大,如果每行数据约为 1.5KB,则每 10000 行为一个bulk
          if (flag) {
            bulkRequest = client.prepareBulk
            flag = false
          }
          val strArray: Array[String] = eachLine.split("###")
          if (strArray.length != 25) {
            // 表示这行数据又问题,为了不影响整体,则跳过
            println("ERROR: strArray.length != 25: " + strArray.length + " lineNum: " + lineNum + " strArray(0): " + strArray(0))
          } else {
              // LinkedHashMap让ES中的数据变得有序
            val esDataMap: java.util.Map[String, String] = new java.util.LinkedHashMap[String, String]
            val id: String = strArray(0)
            esDataMap.put("msisdn", id)
            // 数据合并后的格式为: msisdn###w0的前三###w1的前三###如果为空的话就是null...###w23的前三,共25列
            for (i <- 1 to 24) {
              val locTimesListStr = strArray(i)
              val esDataKey = "w" + (i - 1)
              if (locTimesListStr == null || locTimesListStr.isEmpty || locTimesListStr.equals("null")) {
                esDataMap.put(esDataKey, "")
              } else {
                esDataMap.put(esDataKey, locTimesListStr)
              }
            }
            bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(esDataMap))
            lineNum += 1
            if (lineNum % bulkNum == 0) {
              val endTime: Long = System.currentTimeMillis
              println("bulk push, current lineNum: " + lineNum + ", currentTime s: " + ((endTime - startTime) / 1000))
              val bbq: BulkResponse = bulkRequest.execute.actionGet()
              flag = true
              if (bbq.hasFailures) {
                println("bbq.hasFailures: " + bbq.toString)
                bulkRequest.execute.actionGet
              }
            }
          }
        }
        if (bulkRequest != null) {
          bulkRequest.execute().actionGet()
        }
        client.close()
        val endTime: Long = System.currentTimeMillis
        println("ths time is: " + (endTime - startTime) / 1000 + "s ")
      }
    }
    sc.stop()
  }
}[/mw_shl_code]

踩坑说明:在编写代码中踩了如下坑:

  •     依赖冲突的问题: ES5.2与Spark1.6有如下包会产生依赖: netty-all:io.netty,com.fasterxml.jackson.core:jackson-core, org.apache.logging.log4j:log4j-core.
  •     解决方案:
  •     通过 mvn dependency:tree -Dverbose -Dincludes=com.fasterxml.jackson.core 命令查出依赖原因,然后在pom.xml中增加所需的相关依赖的最高版本;
  •     每个bulk的大小,根据网上的经验是10M-15M为宜,大概计算一下就好了;
  •     后来在单机测试通过,但在集群模式中还是会出现 netty4的依赖冲突:


[mw_shl_code=java,true]   17/07/17 10:21:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[elasticsearch[_client_][management][T#1],5,main]
    java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
            at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
            at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:422)
            at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:93)
            at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:1058)
            at org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:1040)
            at org.elasticsearch.transport.TcpTransport.executeHandshake(TcpTransport.java:1555)
            at org.elasticsearch.transport.TcpTransport.openConnection(TcpTransport.java:502)
            at org.elasticsearch.transport.TcpTransport.connectToNode(TcpTransport.java:460)
            at org.elasticsearch.transport.TransportService.connectToNode(TransportService.java:318)
            at org.elasticsearch.client.transport.TransportClientNodesService$SniffNodesSampler$1.run(TransportClientNodesService.java:488)
            at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:527)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)[/mw_shl_code]

有一种解决方案我没有尝试成功,就是在pom中将冲突的依赖包exclusions掉,各位感兴趣可以尝试,成功了麻烦告知我一下。参考链接:, 使用 maven-shade-plugin 工具打包。

上个方法我尝试几次不成功后,使用了比较暴力的方法,直接将ES的netty参数由netty4改成了netty3,

.put("transport.type", "netty3").put("http.type", "netty3").build

好了,打包好之后,程序就可以完美运行了。

ES中创建索引

就算如果ES中是自动创建索引的,也希望你能手动创建索引和字段属性,因为默认的字段属性是Text,ES会自动对它进行分词相关的操作,如果ES中存的字符串你不想让它被分隔的话,就用keyword替代为Text类型,命令如下:

[mw_shl_code=java,true]PUT  /weekend-20170718
{
  "settings" : {
    "index" : {
      "number_of_shards" : 5,
      "number_of_replicas" : 1,
      "refresh_interval" : "60s"
    },
  "index.routing.allocation.include.zone": "light"
  },
  "mappings": {
    "offline": {
      "properties": {
        "msisdn": {
          "type": "keyword"
        },"w0": {
          "type": "keyword"
        } ...后面省略
      }
    }
  }
}[/mw_shl_code]

创建好索引后检查一下:

GET /weekend-20170718/_mapping

集群中运行

这个比较简单,只需要注意以下几点就好了:

  •     使用jdk1.8版本;
  •     注意内存的申请,可能会出现跑了一段时间后,内存不够用导致程序退出的情况;
  •     观测好ES集群的状态,一段时间后,ES机器的GC比较高
  •     最好别一下子跑所有数据,分几批跑,这样就算出问题,只需要重跑那一部分就好了


数据:通过观察,导入的速度随着时间的增长呈下降趋势,整体来说,ES集群隔离的小集群共有五台物理机,共2.23亿条,751G的数据导入用了约4.5小时,平均速度为 45M/s, 1.38W条/s。

0条评论
作者已关闭评论
王****刚
4文章数
0粉丝数
王****刚
4 文章 | 0 粉丝
原创

spark实践之使用TransportClient往ES写入数据

2023-10-31 02:20:15
36
0

问题导读:
1、如何使用spark连接ES?
2、如何使用TransportClient往ES批量导入数据?
3、在编写代码中踩了哪些坑?
4、ES中如何创建索引?


他们之前把数据导入ES是通过单机的程序导的,或者通过logstash从kafka往ES导,但当数据量很大的时候就会变得很低效,我这两天调研了一下把数据从hdfs直接通过spark导入ES的方法,当然,也适合spark Streaming程序;
这里指出版本号是有必要的,spark版本:1.6.2 ES版本:5.2.1,由于ES的API变动比较频繁,因此最好参考官网文档。

连接ES的方法列举

  •     ES官网中给出了一个与spark连接的方法:是通过RDD可以直接调用 saveToEs 方法实现的;
  •     如果数据量不大的话,可以参考ES提供的RestFulAPI来实现;
  •     本文主要说明我使用的方法,通过 TransportClient 和 bulk 批处理操作来实现,这种方法比较适合数据量很大的情况,又可以灵活处理。


使用TransportClient往ES批量导入的方法

样例代码如下:   

[mw_shl_code=java,true]import java.net.InetAddress
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.action.bulk.{BulkRequestBuilder, BulkResponse}
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClient
/**
  * Author: wangxiaogang
  * Date: 2017/7/11
  * Email: Adamyuanyuan@gmail.com
  * hdfs 中的数据根据格式写到ES中
  */
object HdfsToEs {
  def main(args: Array[String]) {
    if (args.length < 5) {
      System.err.println("Usage: HdfsToEs <file> <esIndex> <esType> <partition>")
      System.exit(1)
    }
    val hdfsInputPath: String = args(0)
    println("hdfsInputPath: " + hdfsInputPath)
    val conf = new SparkConf().setAppName("HdfsToEs")
    val sc = new SparkContext(conf)
    //插入相关,索引 类型 id相关  以args方式提供接口。
    val esIndex: String = args(1)
    val esType: String = args(2)
    val partition: Int = args(3).toInt
    val bulkNum: Int = args(4).toInt
    val hdfsRdd: RDD[String] = sc.textFile(hdfsInputPath, partition)
    val startTime: Long = System.currentTimeMillis
    println("hdfsRDD partition: " + hdfsRdd.getNumPartitions + " setted partition: " + partition)
    hdfsRdd.foreachPartition {
      eachPa => {
        //        生产环境
        val settings: Settings = Settings.builder.put("cluster.name", "production-es").put("client.transport.sniff", true)
          .put("transport.type", "netty3").put("http.type", "netty3").build
        val client: TransportClient = new PreBuiltTransportClient(settings)
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
        var bulkRequest: BulkRequestBuilder = null
        var flag = true
        var lineNum = 0
        for (eachLine <- eachPa) {
          // 每个bulk是10-15M为宜,数据封装为bulk后会较原来的数据略有增大,如果每行数据约为 1.5KB,则每 10000 行为一个bulk
          if (flag) {
            bulkRequest = client.prepareBulk
            flag = false
          }
          val strArray: Array[String] = eachLine.split("###")
          if (strArray.length != 25) {
            // 表示这行数据又问题,为了不影响整体,则跳过
            println("ERROR: strArray.length != 25: " + strArray.length + " lineNum: " + lineNum + " strArray(0): " + strArray(0))
          } else {
              // LinkedHashMap让ES中的数据变得有序
            val esDataMap: java.util.Map[String, String] = new java.util.LinkedHashMap[String, String]
            val id: String = strArray(0)
            esDataMap.put("msisdn", id)
            // 数据合并后的格式为: msisdn###w0的前三###w1的前三###如果为空的话就是null...###w23的前三,共25列
            for (i <- 1 to 24) {
              val locTimesListStr = strArray(i)
              val esDataKey = "w" + (i - 1)
              if (locTimesListStr == null || locTimesListStr.isEmpty || locTimesListStr.equals("null")) {
                esDataMap.put(esDataKey, "")
              } else {
                esDataMap.put(esDataKey, locTimesListStr)
              }
            }
            bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(esDataMap))
            lineNum += 1
            if (lineNum % bulkNum == 0) {
              val endTime: Long = System.currentTimeMillis
              println("bulk push, current lineNum: " + lineNum + ", currentTime s: " + ((endTime - startTime) / 1000))
              val bbq: BulkResponse = bulkRequest.execute.actionGet()
              flag = true
              if (bbq.hasFailures) {
                println("bbq.hasFailures: " + bbq.toString)
                bulkRequest.execute.actionGet
              }
            }
          }
        }
        if (bulkRequest != null) {
          bulkRequest.execute().actionGet()
        }
        client.close()
        val endTime: Long = System.currentTimeMillis
        println("ths time is: " + (endTime - startTime) / 1000 + "s ")
      }
    }
    sc.stop()
  }
}[/mw_shl_code]

踩坑说明:在编写代码中踩了如下坑:

  •     依赖冲突的问题: ES5.2与Spark1.6有如下包会产生依赖: netty-all:io.netty,com.fasterxml.jackson.core:jackson-core, org.apache.logging.log4j:log4j-core.
  •     解决方案:
  •     通过 mvn dependency:tree -Dverbose -Dincludes=com.fasterxml.jackson.core 命令查出依赖原因,然后在pom.xml中增加所需的相关依赖的最高版本;
  •     每个bulk的大小,根据网上的经验是10M-15M为宜,大概计算一下就好了;
  •     后来在单机测试通过,但在集群模式中还是会出现 netty4的依赖冲突:


[mw_shl_code=java,true]   17/07/17 10:21:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[elasticsearch[_client_][management][T#1],5,main]
    java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
            at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
            at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:422)
            at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:93)
            at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:1058)
            at org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:1040)
            at org.elasticsearch.transport.TcpTransport.executeHandshake(TcpTransport.java:1555)
            at org.elasticsearch.transport.TcpTransport.openConnection(TcpTransport.java:502)
            at org.elasticsearch.transport.TcpTransport.connectToNode(TcpTransport.java:460)
            at org.elasticsearch.transport.TransportService.connectToNode(TransportService.java:318)
            at org.elasticsearch.client.transport.TransportClientNodesService$SniffNodesSampler$1.run(TransportClientNodesService.java:488)
            at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:527)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)[/mw_shl_code]

有一种解决方案我没有尝试成功,就是在pom中将冲突的依赖包exclusions掉,各位感兴趣可以尝试,成功了麻烦告知我一下。参考链接:, 使用 maven-shade-plugin 工具打包。

上个方法我尝试几次不成功后,使用了比较暴力的方法,直接将ES的netty参数由netty4改成了netty3,

.put("transport.type", "netty3").put("http.type", "netty3").build

好了,打包好之后,程序就可以完美运行了。

ES中创建索引

就算如果ES中是自动创建索引的,也希望你能手动创建索引和字段属性,因为默认的字段属性是Text,ES会自动对它进行分词相关的操作,如果ES中存的字符串你不想让它被分隔的话,就用keyword替代为Text类型,命令如下:

[mw_shl_code=java,true]PUT  /weekend-20170718
{
  "settings" : {
    "index" : {
      "number_of_shards" : 5,
      "number_of_replicas" : 1,
      "refresh_interval" : "60s"
    },
  "index.routing.allocation.include.zone": "light"
  },
  "mappings": {
    "offline": {
      "properties": {
        "msisdn": {
          "type": "keyword"
        },"w0": {
          "type": "keyword"
        } ...后面省略
      }
    }
  }
}[/mw_shl_code]

创建好索引后检查一下:

GET /weekend-20170718/_mapping

集群中运行

这个比较简单,只需要注意以下几点就好了:

  •     使用jdk1.8版本;
  •     注意内存的申请,可能会出现跑了一段时间后,内存不够用导致程序退出的情况;
  •     观测好ES集群的状态,一段时间后,ES机器的GC比较高
  •     最好别一下子跑所有数据,分几批跑,这样就算出问题,只需要重跑那一部分就好了


数据:通过观察,导入的速度随着时间的增长呈下降趋势,整体来说,ES集群隔离的小集群共有五台物理机,共2.23亿条,751G的数据导入用了约4.5小时,平均速度为 45M/s, 1.38W条/s。

文章来自个人专栏
小刚的大数据笔记
4 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0