第17课:SparkStreaming资源动态申请和动态控制消费速率原理剖析

本期内容:

发展壮大离不开广大客户长期以来的信赖与支持,我们将始终秉承“诚信为本、服务至上”的服务理念,坚持“二合一”的优良服务模式,真诚服务每家企业,认真做好每个细节,不断完善自我,成就企业,实现共赢。行业涉及成都航空箱等,在重庆网站建设公司全网营销推广、WAP手机网站、VI设计、软件开发等项目上具有丰富的设计经验。

  • Spark Streaming资源动态分配

  • Spark Streaming动态控制消费速率

为什么需要动态?

  • Spark默认情况下粗粒度的,先分配好资源再计算。而Spark Streaming有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费。

  • Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素。

  • Spark Streaming资源动态调整的时候会面临挑战:

  • Spark Streaming是按照Batch Duration运行的,Batch Duration需要很多资源,下一次Batch Duration就不需要那么多资源了,调整资源的时候还没调整完Batch Duration运行就已经过期了。这个时候调整时间间隔。

Spark Streaming资源动态申请 

1. 在SparkContext中默认是不开启动态资源分配的,但是可以通过手动在SparkConf中配置。

// Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)if (!dynamicAllocationEnabled && //参数配置是否开启资源动态分配_conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
  logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}

_executorAllocationManager =
  if (dynamicAllocationEnabled) {    Some(new ExecutorAllocationManager(this, listenerBus, _conf))
  } else {    None
  }
_executorAllocationManager.foreach(_.start())
  1. ExecutorAllocationManager: 有定时器会不断的去扫描Executor的情况,正在运行的Stage,要运行在不同的Executor中,要么增加Executor或者减少。

  2. ExecutorAllocationManager中schedule方法会被周期性触发进行资源动态调整。

/** * This is called at a fixed interval to regulate the number of pending executor requests * and number of executors running. * * First, adjust our requested executors based on the add time and our current needs. * Then, if the remove time for an existing executor has expired, kill the executor. * * This is factored out into its own method for testing. */private def schedule(): Unit = synchronized {  val now = clock.getTimeMillis

  updateAndSyncNumExecutorsTarget(now)

  removeTimes.retain { case (executorId, expireTime) =>
    val expired = now >= expireTime    if (expired) {
      initializing = false
      removeExecutor(executorId)
    }    !expired
  }
}
  1. 在ExecutorAllocationManager中会在线程池中定时器会不断的运行schedule.

/** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */def start(): Unit = {
  listenerBus.addListener(listener)  val scheduleTask = new Runnable() {    override def run(): Unit = {      try {
        schedule()
      } catch {        case ct: ControlThrowable =>
          throw ct        case t: Throwable =>
          logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
      }
    }
  }// intervalMillis定时器触发时间
  executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}

动态控制消费速率: Spark Streaming提供了一种弹性机制,流进来的速度和处理速度的关系,是否来得及处理数据。如果不能来得及的话,他会自动动态控制数据流进来的速度,spark.streaming.backpressure.enabled参数设置。

动态控制消费速率的原理可参考论文 Adaptive Stream Processing using Dynamic Batch Sizing

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains


分享题目:第17课:SparkStreaming资源动态申请和动态控制消费速率原理剖析
文章转载:http://ybzwz.com/article/pcpogj.html