kafka数据源FlinkKafkaConsumer分析

这篇文章主要讲解了“kafka数据源Flink Kafka Consumer分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“kafka数据源Flink Kafka Consumer分析”吧!

成都创新互联公司秉承专业、诚信、服务、进取的价值观,坚持以客户为中心、客户至上的服务理念,以“关注企业需求,实现企业价值”为导向,努力为企业提供全面优质的互联网应用服务。服务包括域名注册、网络空间、企业邮箱、网站建设、网站优化、网络营销解决方案和咨询服务,以帮助企业客户应用互联网。

一、open()方法调用时机

FlinkKafkaConsumer继承自RichFunction,具有生命周期方法open()。那么flink是何时调用FlinkKafkaConsumer的open()方法呢?

StreamTask在调用算子程序之前,会执行beforeInvoke()方法,在该方法中会初始化算子的算子并且执行open()方法:

	operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());

initializeStateAndOpenOperators()方法中循环对算子初始化:

	protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
		for (StreamOperatorWrapper operatorWrapper : getAllOperators(true)) {
			StreamOperator operator = operatorWrapper.getStreamOperator();
			operator.initializeState(streamTaskStateInitializer);
			operator.open();
		}
	}

kafka source对应的operator为StreamSource,其open()方法为

	public void open() throws Exception {
		super.open();
		FunctionUtils.openFunction(userFunction, new Configuration());
	}

FunctionUtils的openFunction()即执行算子(要继承RichFunction)的open()方法:

	public static void openFunction(Function function, Configuration parameters) throws Exception{
		if (function instanceof RichFunction) {
			RichFunction richFunction = (RichFunction) function;
			richFunction.open(parameters);
		}
	}

二、运行时上下文RuntimeContext何时赋值?

在 StreamTask.beforeInvoke() -> new OperatorChain() -> StreamOperatorFactoryUtil.createOperator(),在OperatorChain的构造函数中,通过工厂类StreamOperatorFactory来创建StreamOperator。kafka source对应的StreamOperatorFactory为SimpleOperatorFactory,createStreamOperator()方法中调用StreamOperator的setup()方法:

	public > T createStreamOperator(StreamOperatorParameters parameters) {
		if (operator instanceof AbstractStreamOperator) {
			((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService);
		}
		if (operator instanceof SetupableStreamOperator) {
			((SetupableStreamOperator) operator).setup(
				parameters.getContainingTask(),
				parameters.getStreamConfig(),
				parameters.getOutput());
		}
		return (T) operator;
	}

kafka source对应的StreamOperator为StreamSource,其实现了SetupableStreamOperator接口。其setup方法在父类AbstractUdfStreamOperator:

	public void setup(StreamTask containingTask, StreamConfig config, Output> output) {
		super.setup(containingTask, config, output);
		FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());

	}

FunctionUtils.setFunctionRuntimeContext()来给算子设置RuntimeContext。设置的RuntimeContext在AbstractStreamOperator的setup()方法中,为StreamingRuntimeContext:

		this.runtimeContext = new StreamingRuntimeContext(
			environment,
			environment.getAccumulatorRegistry().getUserMap(),
			getMetricGroup(),
			getOperatorID(),
			getProcessingTimeService(),
			null,
			environment.getExternalResourceInfoProvider());

三、FlinkKafkaConsumer的run()方法

Flink调用FlinkKafkaConsumer的run()方法来生产数据。run()方法的处理逻辑:

①创建KafkaFetcher,来拉取数据

		this.kafkaFetcher = createFetcher(
				sourceContext,
				subscribedPartitionsToStartOffsets,
				watermarkStrategy,
				(StreamingRuntimeContext) getRuntimeContext(),
				offsetCommitMode,
				getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
				useMetrics);

②KafkaFetcher的runFetchLoop()中创建KafkaConsumerThread线程来循环拉取kafka数据。KafkaConsumerThread通过KafkaConsumer拉取kafka数据,并交给Handover

				if (records == null) {
					try {
						records = consumer.poll(pollTimeout);
					}
					catch (WakeupException we) {
						continue;
					}
				}

				try {
					handover.produce(records);
					records = null;
				}

KafkaFetcher通过Handover获取拉取的kafka数据

			while (running) {
				// this blocks until we get the next records
				// it automatically re-throws exceptions encountered in the consumer thread
				final ConsumerRecords records = handover.pollNext();

				// get the records for each topic partition
				for (KafkaTopicPartitionState partition : subscribedPartitionStates()) {

					List> partitionRecords =
						records.records(partition.getKafkaPartitionHandle());

					partitionConsumerRecordsHandler(partitionRecords, partition);
				}
			}

③通过SourceContext中的Output>来发送数据给下一个算子

		public void collect(T element) {
			synchronized (lock) {
				output.collect(reuse.replace(element));
			}
		}

SourceContext在StreamSource的run()方法中通过StreamSourceContexts.getSourceContext()创建。Output>在OperatorChain的createOutputCollector()创建,为其返回值。

		for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
			@SuppressWarnings("unchecked")
			RecordWriterOutput output = (RecordWriterOutput) streamOutputs.get(outputEdge);

			allOutputs.add(new Tuple2<>(output, outputEdge));
		}

当有一个输出时,是RecordWriterOutput;多个时,是CopyingDirectedOutput或DirectedOutput

④单个输出RecordWriterOutput时,是通过成员属性RecordWriter实例来输出。RecordWriter通过StreamTask的createRecordWriterDelegate()创建,RecordWriterDelegate为RecordWriter的代理类,内部持有RecordWriter实例:

	public static  RecordWriterDelegate>> createRecordWriterDelegate(
			StreamConfig configuration,
			Environment environment) {
		List>>> recordWrites = createRecordWriters(
			configuration,
			environment);
		if (recordWrites.size() == 1) {
			return new SingleRecordWriter<>(recordWrites.get(0));
		} else if (recordWrites.size() == 0) {
			return new NonRecordWriter<>();
		} else {
			return new MultipleRecordWriters<>(recordWrites);
		}
	}

	private static  List>>> createRecordWriters(
			StreamConfig configuration,
			Environment environment) {
		List>>> recordWriters = new ArrayList<>();
		List outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());

		for (int i = 0; i < outEdgesInOrder.size(); i++) {
			StreamEdge edge = outEdgesInOrder.get(i);
			recordWriters.add(
				createRecordWriter(
					edge,
					i,
					environment,
					environment.getTaskInfo().getTaskName(),
					edge.getBufferTimeout()));
		}
		return recordWriters;
	}

outEdgesInOrder来源于StreamGraph中的StreamNode的List outEdges。

创建RecordWriter时,根据StreamEdge的StreamPartitioner outputPartitioner的isBroadcast()方法判断是BroadcastRecordWriter还是ChannelSelectorRecordWriter:

	public RecordWriter build(ResultPartitionWriter writer) {
		if (selector.isBroadcast()) {
			return new BroadcastRecordWriter<>(writer, timeout, taskName);
		} else {
			return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName);
		}
	}

outputPartitioner是根据上下游节点并行度是否一致来确定:

			if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
				partitioner = new ForwardPartitioner();
			} else if (partitioner == null) {
				partitioner = new RebalancePartitioner();
			}

BroadcastRecordWriter和ChannelSelectorRecordWriter最终都会调用成员属性ResultPartitionWriter targetPartition的flush()方法来输出数据。ResultPartitionWriter 在ConsumableNotifyingResultPartitionWriterDecorator的decorate()生成。根据对应的ResultPartitionDeploymentDescriptor来判断是ConsumableNotifyingResultPartitionWriterDecorator还是直接传入的partitionWriters。ConsumableNotifyingResultPartitionWriterDecorator会把消息直接传给下个节点消费,通过ResultPartitionConsumableNotifier来通知:

	public static ResultPartitionWriter[] decorate(
			Collection descs,
			ResultPartitionWriter[] partitionWriters,
			TaskActions taskActions,
			JobID jobId,
			ResultPartitionConsumableNotifier notifier) {

		ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length];
		int counter = 0;
		for (ResultPartitionDeploymentDescriptor desc : descs) {
			if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) {
				consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriterDecorator(
					taskActions,
					jobId,
					partitionWriters[counter],
					notifier);
			} else {
				consumableNotifyingPartitionWriters[counter] = partitionWriters[counter];
			}
			counter++;
		}
		return consumableNotifyingPartitionWriters;
	}

partitionWriters通过 NettyShuffleEnvironment的createResultPartitionWriters() -> ResultPartitionFactory的create()  创建。 ResultPartition的输出是通过成员属性ResultSubpartition[] subpartitions完成。subpartitions在ResultPartitionFactory的createSubpartitions()生成:

	private void createSubpartitions(
			ResultPartition partition,
			ResultPartitionType type,
			BoundedBlockingSubpartitionType blockingSubpartitionType,
			ResultSubpartition[] subpartitions) {
		// Create the subpartitions.
		if (type.isBlocking()) {
			initializeBoundedBlockingPartitions(
				subpartitions,
				partition,
				blockingSubpartitionType,
				networkBufferSize,
				channelManager);
		} else {
			for (int i = 0; i < subpartitions.length; i++) {
				subpartitions[i] = new PipelinedSubpartition(i, partition);
			}
		}
	}

流式任务时,ResultSubpartition为PipelinedSubpartition。

四、数据写出

4.1 ResultPartitionConsumableNotifier通知

ResultPartitionConsumableNotifier在TaskExecutor的associateWithJobManager()中生成:

	private JobTable.Connection associateWithJobManager(
			JobTable.Job job,
			ResourceID resourceID,
			JobMasterGateway jobMasterGateway) {
		......
        ......

		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
			jobMasterGateway,
			getRpcService().getExecutor(),
			taskManagerConfiguration.getTimeout());

		......
        ......
	}

RpcResultPartitionConsumableNotifier远程调用JobMaster的scheduleOrUpdateConsumers()方法,传入ResultPartitionID partitionId

4.1.1 JobMaster的scheduleOrUpdateConsumers()

JobMaster通过ExecutionGraph的scheduleOrUpdateConsumers()通知下游消费算子。

这里有两个关键代码:

①从本算子ExecutionVertex的成员Map resultPartitions中取出该分区对应的生产消费信息,这些信息存储在IntermediateResultPartition中;

	void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {

		.......

		final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId());

		.......

		if (partition.getIntermediateResult().getResultType().isPipelined()) {
			// Schedule or update receivers of this partition
			execution.scheduleOrUpdateConsumers(partition.getConsumers());
		}
		else {
			throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" +
					"pipelined partitions.");
		}
	}

从IntermediateResultPartition取出消费者List> allConsumers;

从ExecutionEdge的ExecutionVertex target的Execution currentExecution中取出执行任务;

②Execution的sendUpdatePartitionInfoRpcCall()方法通过rpc调用TaskExcutor的updatePartitions()方法来执行下游消费者算子

	private void sendUpdatePartitionInfoRpcCall(
			final Iterable partitionInfos) {

		final LogicalSlot slot = assignedResource;

		if (slot != null) {
			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
			final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();

			CompletableFuture updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout);

			updatePartitionsResultFuture.whenCompleteAsync(
				(ack, failure) -> {
					// fail if there was a failure
					if (failure != null) {
						fail(new IllegalStateException("Update to task [" + getVertexWithAttempt() +
							"] on TaskManager " + taskManagerLocation + " failed", failure));
					}
				}, getVertex().getExecutionGraph().getJobMasterMainThreadExecutor());
		}
	}
4.1.2 TaskExecutor的updatePartitions()

TaskExecutor的updatePartitions()来更新分区信息。如果之前InputChannel是未知的,则进行更新。SimpleInputGate的updateInputChannel():

	public void updateInputChannel(
			ResourceID localLocation,
			NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException {
		synchronized (requestLock) {
			if (closeFuture.isDone()) {
				// There was a race with a task failure/cancel
				return;
			}

			IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId();

			InputChannel current = inputChannels.get(partitionId);

			if (current instanceof UnknownInputChannel) {
				UnknownInputChannel unknownChannel = (UnknownInputChannel) current;
				boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
				InputChannel newChannel;
				if (isLocal) {
					newChannel = unknownChannel.toLocalInputChannel();
				} else {
					RemoteInputChannel remoteInputChannel =
						unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId());
					remoteInputChannel.assignExclusiveSegments();
					newChannel = remoteInputChannel;
				}
				LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);

				inputChannels.put(partitionId, newChannel);
				channels[current.getChannelIndex()] = newChannel;

				if (requestedPartitionsFlag) {
					newChannel.requestSubpartition(consumedSubpartitionIndex);
				}

				for (TaskEvent event : pendingEvents) {
					newChannel.sendTaskEvent(event);
				}

				if (--numberOfUninitializedChannels == 0) {
					pendingEvents.clear();
				}
			}
		}
	}

4.2 PipelinedSubpartition写出

kafka数据源Flink Kafka Consumer分析

记录先写到缓存ArrayDeque buffers中,然后通过PipelinedSubpartitionView readView的notifyDataAvailable()  -> BufferAvailabilityListener availabilityListener的notifyDataAvailable() 方法来通知。

4.2.1 BufferAvailabilityListener创建时机?

①TaskManagerServices在创建ShuffleEnvironment时,通过 NettyShuffleServiceFactory的createNettyShuffleEnvironment() -> new NettyConnectionManager() -> new NettyServer() -> ServerChannelInitializer的initChannel() -> NettyProtocol的getServerChannelHandlers() 获取Netty服务端的处理器PartitionRequestServerHandler:

	public ChannelHandler[] getServerChannelHandlers() {
		PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
		PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
			partitionProvider,
			taskEventPublisher,
			queueOfPartitionQueues);

		return new ChannelHandler[] {
			messageEncoder,
			new NettyMessage.NettyMessageDecoder(),
			serverHandler,
			queueOfPartitionQueues
		};
	}

②PartitionRequestServerHandler在获取到客户端发送的PartitionRequest 消息时, 创建CreditBasedSequenceNumberingViewReader,并通过 requestSubpartitionView() -> ResultPartitionManager的createSubpartitionView() -> ResultPartition的createSubpartitionView() 来设置CreditBasedSequenceNumberingViewReader

③CreditBasedSequenceNumberingViewReader的notifyDataAvailable()方法调用PartitionRequestQueue的notifyReaderNonEmpty(),通知下游算子:

	void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
		// The notification might come from the same thread. For the initial writes this
		// might happen before the reader has set its reference to the view, because
		// creating the queue and the initial notification happen in the same method call.
		// This can be resolved by separating the creation of the view and allowing
		// notifications.

		// TODO This could potentially have a bad performance impact as in the
		// worst case (network consumes faster than the producer) each buffer
		// will trigger a separate event loop task being scheduled.
		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
	}

感谢各位的阅读,以上就是“kafka数据源Flink Kafka Consumer分析”的内容了,经过本文的学习后,相信大家对kafka数据源Flink Kafka Consumer分析这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


分享文章:kafka数据源FlinkKafkaConsumer分析
本文链接:http://ybzwz.com/article/iiddji.html