如何使用spark-core实现广度优先搜索

如何使用spark-core实现广度优先搜索,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

创新互联服务项目包括石阡网站建设、石阡网站制作、石阡网页制作以及石阡网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,石阡网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到石阡省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!

需求描述

数据源是一批网络日志数据,每条数据都有两个字段srcip和dstip,字段之间以逗号分隔,问题的需求是给定一个srcip和dstip,在给定的搜索深度下检索这两个ip之间所有的通联路径。这个问题是网络日志处理中的一个实际需求,之前在单机的程序中实现过,但是需要将所有的ip对加载到内存中。考虑到如果数据量太大的情况,可能单节点的内存无法支撑这样的操作,但是如果不将ip对全加载内存中,使用深度优先遍历的方法,搜索过程又会很慢。最近在学习spark框架,刚接触RDD,就是这用RDD来解决这个问题。以下是scala代码

package com.pxu.spark.core

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
 *  pxu
 *  2021-01-29 16:57
 */
object FindIpRel {


  def main(args: Array[String]): Unit = {

    val srcIp = args(0) // 源ip
    val dstIp = args(1) // 目标ip
    val depth = args(2).toInt //搜索深度
    val resPath = args(3) //搜索结果的输出位置

    val conf = new SparkConf().setAppName("findIpRel")
    val sc = new SparkContext(conf)


    /**
     * 从数据源中构建原始rdd,每一行的数据形式为a,b
     */

    val ori = sc.textFile("hdfs://master:9000/submitTest/input/ipconn/srcdst.csv")

    /**
     * 对原始Rdd进行元组形式转化,现在每一行的数据形式为(a,b)
     * 除此之外还对数据进行了去重处理,并显示使用hash分区器对RDD中的数据进行分区
     * 为后面的join操作,做一些优化
     */
    val base = ori.map(a => {
      val tmpArr = a.split(",")
      (tmpArr(0), tmpArr(1))
    }).distinct().partitionBy(new HashPartitioner(10))


    /**
     * 这是一个用于保存结果的RDD,其中每一行的形式为(dstIp,List(ip on path))
     * 在查找过程中,发现了搜索结果后,就会将其并入到res中
     */
    var res = sc.makeRDD[(String,List[String])](List())

    /**
     * 这是一个用于迭代的RDD,其初始化的内容是,首先从baseRdd中过滤出元组第一个元素a是参数SrcIp的,
     * 然后将其转化成(b,List(a))的格式,其中b总是代表当前搜索路径上的尾ip,list中的其他内容代表搜索
     * 路径上其他的ip
     */
    var iteration = base.filter(_._1.equals(srcIp)).map(a => (a._2,List(a._1)))

    for(i <- 2 to depth){

      /**
       * 1.首先iteration和base按照key进行join,这个操作的意义就是更深一层的搜索,结果RDD的格式是(b,(List(ip on path),c))
       * 2.对数据进行一次过滤,过去掉那些路径已经形成环的元素,成环的判据就是List(ip on path)中的数据已经包含c了
       * 3.进行map操作,b并入到List(ip on path),将c作为新的key,因此此时更深一层的搜索,导致c成为了当前搜索路径中的尾节点,
       *   此时RDD中的每一个元素的格式应该是(c,(List(ip on path))
       */
      val tmp = iteration.join(base).filter(a => !a._2._1.contains(a._2._2)).map(a => (a._2._2,a._2._1:+a._1))

      /**
       * 将tmp中已经成功搜索的路径筛选出来,成功搜索的判据是(c,(List(ip on path)),c与dstIp相等
       */
      val success = tmp.filter(a => a._1.equals(dstIp))

      /**
       * 将成功搜索的数据合并到res中
       */
      res = res.union(success)
      
      /**
       * 更新iteration
       */
      iteration = tmp.subtract(success)

    }
    
    /**
     * 将成功搜索的路径并入到res中
     */
    res.union(iteration.filter(a => a._1.equals(dstIp)))

    /**
     * 执行一次转换操作,将res中的元素从(c,(List(ip on path))格式转换成List(all ip on path)
     */
    val finalResult = res.map(a => a._2 :+ a._1)

    finalResult.saveAsTextFile(resPath)

  }

}

关于如何使用spark-core实现广度优先搜索问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注创新互联行业资讯频道了解更多相关知识。


新闻名称:如何使用spark-core实现广度优先搜索
当前网址:http://ybzwz.com/article/pjijee.html