Skip to content

Latest commit



517 lines (416 loc) · 24.4 KB

File metadata and controls

517 lines (416 loc) · 24.4 KB

Time & Watermark(2): Watermark的传播与处理


在上一篇Time & Watermark(1): Flink中Watermark的生成中介绍了三种Time和Watermark的概念,以及这三种Time在Flink中对应的Watermark生成方案。本篇将进一步研究Watermark,分析Watermark是怎么在Flink中发挥作用的。





public final class Watermark extends StreamElement {

	/** The watermark that signifies end-of-event-time. */
	public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);

	/** The timestamp of the watermark in milliseconds. */
	private final long timestamp;

    // constructor, getter, equals, hashCode & toString




// OperatorChain.class第559行
public interface WatermarkGaugeExposingOutput<T> extends Output<T> {
	Gauge<Long> getWatermarkGauge();

// RecordWriterOutput.class第115行
// emitWatermark(Watermark)方法
public void emitWatermark(Watermark mark) {

	if (streamStatusProvider.getStreamStatus().isActive()) {
		try {
		} catch (Exception e) {
			throw new RuntimeException(e.getMessage(), e);


  1. Output的Watermark进度更新为即将发出的Watermark时间戳
  2. 使用广播发送的方式向所有下游算子发出Watermark



  1. 算子仅有一个上游算子,该上游算子进入闲置状态,但定时任务依然在继续发出Watermark
  2. 算子有多个上游算子,其中的某个上游算子进入闲置状态/从闲置状态中恢复但Watermark大幅落后于其他上游算子

Stream Status elements received at downstream tasks also affect and control how their operators process and advance their watermarks. The below describes the effects (the logic is implemented as a {@link StatusWatermarkValve} which downstream tasks should use for such purposes):

  • Since source tasks guarantee that no records will be emitted between a {@link StreamStatus#IDLE} and {@link StreamStatus#ACTIVE}, downstream tasks can always safely process and propagate records through their operator chain when they receive them, without the need to check whether or not the task is currently idle or active. However, for watermarks, since there may be watermark generators that might produce watermarks anywhere in the middle of topologies regardless of whether there are input data at the operator, the current status of the task must be checked before forwarding watermarks emitted from an operator. If the status is actually idle, the watermark must be blocked.

  • For downstream tasks with multiple input streams, the watermarks of input streams that are temporarily idle, or has resumed to be active but its watermark is behind the overall min watermark of the operator, should not be accounted for when deciding whether or not to advance the watermark and propagated through the operator chain.

Note that to notify downstream tasks that a source task is permanently closed and will no longer send any more elements, the source should still send a {@link Watermark#MAX_WATERMARK} instead of {@link StreamStatus#IDLE}. Stream Status elements only serve as markers for temporary status.


public class StatusWatermarkValve {

	private final DataOutput output;

	// ------------------------------------------------------------------------
	//	Runtime state for watermark & stream status output determination
	// ------------------------------------------------------------------------

	 * Array of current status of all input channels. Changes as watermarks & stream statuses are
	 * fed into the valve.
	private final InputChannelStatus[] channelStatuses;

	/** The last watermark emitted from the valve. */
	private long lastOutputWatermark;

	/** The last stream status emitted from the valve. */
	private StreamStatus lastOutputStreamStatus;

	 * Returns a new {@code StatusWatermarkValve}.
	 * @param numInputChannels the number of input channels that this valve will need to handle
	 * @param output the customized output handler for the valve
	public StatusWatermarkValve(int numInputChannels, DataOutput output) {
		checkArgument(numInputChannels > 0);
		this.channelStatuses = new InputChannelStatus[numInputChannels];
		for (int i = 0; i < numInputChannels; i++) {
			channelStatuses[i] = new InputChannelStatus();
			channelStatuses[i].watermark = Long.MIN_VALUE;
			channelStatuses[i].streamStatus = StreamStatus.ACTIVE;
			channelStatuses[i].isWatermarkAligned = true;

		this.output = checkNotNull(output);

		this.lastOutputWatermark = Long.MIN_VALUE;
		this.lastOutputStreamStatus = StreamStatus.ACTIVE;

	 * Feed a {@link Watermark} into the valve. If the input triggers the valve to output a new Watermark,
	 * {@link DataOutput#emitWatermark(Watermark)} will be called to process the new Watermark.
	 * @param watermark the watermark to feed to the valve
	 * @param channelIndex the index of the channel that the fed watermark belongs to (index starting from 0)
	public void inputWatermark(Watermark watermark, int channelIndex) throws Exception {
		// ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle).
		if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
			long watermarkMillis = watermark.getTimestamp();

			// if the input watermark's value is less than the last received watermark for its input channel, ignore it also.
			if (watermarkMillis > channelStatuses[channelIndex].watermark) {
				channelStatuses[channelIndex].watermark = watermarkMillis;

				// previously unaligned input channels are now aligned if its watermark has caught up
				if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {
					channelStatuses[channelIndex].isWatermarkAligned = true;

				// now, attempt to find a new min watermark across all aligned channels

	 * Feed a {@link StreamStatus} into the valve. This may trigger the valve to output either a new Stream Status,
	 * for which {@link DataOutput#emitStreamStatus(StreamStatus)} will be called, or a new Watermark,
	 * for which {@link DataOutput#emitWatermark(Watermark)} will be called.
	 * @param streamStatus the stream status to feed to the valve
	 * @param channelIndex the index of the channel that the fed stream status belongs to (index starting from 0)
	public void inputStreamStatus(StreamStatus streamStatus, int channelIndex) throws Exception {
		// only account for stream status inputs that will result in a status change for the input channel
		if (streamStatus.isIdle() && channelStatuses[channelIndex].streamStatus.isActive()) {
			// handle active -> idle toggle for the input channel
			channelStatuses[channelIndex].streamStatus = StreamStatus.IDLE;

			// the channel is now idle, therefore not aligned
			channelStatuses[channelIndex].isWatermarkAligned = false;

			// if all input channels of the valve are now idle, we need to output an idle stream
			// status from the valve (this also marks the valve as idle)
			if (!InputChannelStatus.hasActiveChannels(channelStatuses)) {

				// now that all input channels are idle and no channels will continue to advance its watermark,
				// we should "flush" all watermarks across all channels; effectively, this means emitting
				// the max watermark across all channels as the new watermark. Also, since we already try to advance
				// the min watermark as channels individually become IDLE, here we only need to perform the flush
				// if the watermark of the last active channel that just became idle is the current min watermark.
				if (channelStatuses[channelIndex].watermark == lastOutputWatermark) {

				lastOutputStreamStatus = StreamStatus.IDLE;
			} else if (channelStatuses[channelIndex].watermark == lastOutputWatermark) {
				// if the watermark of the channel that just became idle equals the last output
				// watermark (the previous overall min watermark), we may be able to find a new
				// min watermark from the remaining aligned channels
		} else if (streamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isIdle()) {
			// handle idle -> active toggle for the input channel
			channelStatuses[channelIndex].streamStatus = StreamStatus.ACTIVE;

			// if the last watermark of the input channel, before it was marked idle, is still larger than
			// the overall last output watermark of the valve, then we can set the channel to be aligned already.
			if (channelStatuses[channelIndex].watermark >= lastOutputWatermark) {
				channelStatuses[channelIndex].isWatermarkAligned = true;

			// if the valve was previously marked to be idle, mark it as active and output an active stream
			// status because at least one of the input channels is now active
			if (lastOutputStreamStatus.isIdle()) {
				lastOutputStreamStatus = StreamStatus.ACTIVE;

	private void findAndOutputNewMinWatermarkAcrossAlignedChannels() throws Exception {
		long newMinWatermark = Long.MAX_VALUE;
		boolean hasAlignedChannels = false;

		// determine new overall watermark by considering only watermark-aligned channels across all channels
		for (InputChannelStatus channelStatus : channelStatuses) {
			if (channelStatus.isWatermarkAligned) {
				hasAlignedChannels = true;
				newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);

		// we acknowledge and output the new overall watermark if it really is aggregated
		// from some remaining aligned channel, and is also larger than the last output watermark
		if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
			lastOutputWatermark = newMinWatermark;
			output.emitWatermark(new Watermark(lastOutputWatermark));

	private void findAndOutputMaxWatermarkAcrossAllChannels() throws Exception {
		long maxWatermark = Long.MIN_VALUE;

		for (InputChannelStatus channelStatus : channelStatuses) {
			maxWatermark = Math.max(channelStatus.watermark, maxWatermark);

		if (maxWatermark > lastOutputWatermark) {
			lastOutputWatermark = maxWatermark;
			output.emitWatermark(new Watermark(lastOutputWatermark));

	 * An {@code InputChannelStatus} keeps track of an input channel's last watermark, stream
	 * status, and whether or not the channel's current watermark is aligned with the overall
	 * watermark output from the valve.
	 * <p>There are 2 situations where a channel's watermark is not considered aligned:
	 * <ul>
	 *   <li>the current stream status of the channel is idle
	 *   <li>the stream status has resumed to be active, but the watermark of the channel hasn't
	 *   caught up to the last output watermark from the valve yet.
	 * </ul>
	protected static class InputChannelStatus {
		protected long watermark;
		protected StreamStatus streamStatus;
		protected boolean isWatermarkAligned;

		 * Utility to check if at least one channel in a given array of input channels is active.
		private static boolean hasActiveChannels(InputChannelStatus[] channelStatuses) {
			for (InputChannelStatus status : channelStatuses) {
				if (status.streamStatus.isActive()) {
					return true;
			return false;

	protected InputChannelStatus getInputChannelStatus(int channelIndex) {
			channelIndex >= 0 && channelIndex < channelStatuses.length,
			"Invalid channel index. Number of input channels: " + channelStatuses.length);

		return channelStatuses[channelIndex];


  1. 判断新到的Watermark是否是一个有意义的Watermark(其时间戳晚于该Channel先前收到的最晚的Watermark),如果有意义则在Channel状态中更新Watermark状态
  2. 判断新到的Watermark时间戳是否比lastOutputWatermark时间更晚(即判断是不是上面情况二中延迟的Watermark),如果不是一个延迟Watermark,则将这个Channel标记为同步(aligned)Channel
  3. 检查所有同步Channel中的Watermark状态,记录其中最早的Watermark时间戳。如果存在同步Channel,且该最早时间戳比lastOutputWatermark晚,向下游输出该时间戳作为Watermark


  • 如果此时所有的Channel都变为闲置状态,且该Channel输出了lastOutputWatermark,则执行FlushAll操作,输出所有Channel状态中时间戳最大的Watermark,以触发下游算子可能的最晚的计算
  • 如果此时还有其他激活状态的Channel,且该Channel输出了lastOutputWatermark,则执行输出Watermark流程的第3步,发送同步Channel中最早的Watermark




// AbstractStreamOperator.class第566行
public void processWatermark(Watermark mark) throws Exception {
	if (timeServiceManager != null) {

public void processWatermark1(Watermark mark) throws Exception {
	input1Watermark = mark.getTimestamp();
	long newMin = Math.min(input1Watermark, input2Watermark);
	if (newMin > combinedWatermark) {
		combinedWatermark = newMin;
		processWatermark(new Watermark(combinedWatermark));

public void processWatermark2(Watermark mark) throws Exception {
	input2Watermark = mark.getTimestamp();
	long newMin = Math.min(input1Watermark, input2Watermark);
	if (newMin > combinedWatermark) {
		combinedWatermark = newMin;
		processWatermark(new Watermark(combinedWatermark));

// AbstractStreamOperatorV2.class第473行
protected void reportWatermark(Watermark mark, int inputId) throws Exception {
	inputWatermarks[inputId - 1] = mark.getTimestamp();
	long newMin = mark.getTimestamp();
	for (long inputWatermark : inputWatermarks) {
		newMin = Math.min(inputWatermark, newMin);
	if (newMin > combinedWatermark) {
		combinedWatermark = newMin;
		processWatermark(new Watermark(combinedWatermark));


// InternalTimerServiceImpl.class第268行
public void advanceWatermark(long time) throws Exception {
	currentWatermark = time;

	InternalTimer<K, N> timer;

	while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {

基本处理逻辑就是,当一个新Watermark到来时,将Event Time的计时器队列中所有到时间的计时器取出来,然后触发对应的触发器Triggerable#onEventTime。单输入流触发器包括与窗口相关的WindowOperator及其子类和与窗口无关的KeyedProcessOperator;双输入流触发器包括IntervalJoinOperatorKeyedCoProcessOperatorCoBroadcastWithKeyedOperator



public void processWatermark(Watermark mark) throws Exception {
	currentWatermark = mark.getTimestamp();


Async I/O算子AsyncWaitOperator将Watermark当成一个处理完成的数据元素(见WatermarkQueueEntry)放入异步I/O队列中,处理根据异步I/O结果是否有序有不同的处理流程。Watermark的到达一定会触发一次Async I/O的结果输出(调用StreamElementQueue#emitCompletedElement方法)。

// AsyncWaitOperator.class第199行
public void processWatermark(Watermark mark) throws Exception {

	// watermarks are always completed
	// if there is no prior element, we can directly emit them
	// this also avoids watermarks being held back until the next element has been processed

// AsyncWaitOperator.class第254行
private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement) throws InterruptedException {

	Optional<ResultFuture<OUT>> queueEntry;
	while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {

	return queueEntry.get();

// AsyncWaitOperator.class第277行 
private void outputCompletedElement() {
	if (queue.hasCompletedElements()) {
		// emit only one element to not block the mailbox thread unnecessarily
		// if there are more completed elements, emit them with subsequent mails
		if (queue.hasCompletedElements()) {
			mailboxExecutor.execute(this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");


结果有序Async I/O算子相关Watermark处理逻辑
// OrderedStreamElementQueue.class第62行
public boolean hasCompletedElements() {
	return !queue.isEmpty() && queue.peek().isDone();

public void emitCompletedElement(TimestampedCollector<OUT> output) {
	if (hasCompletedElements()) {
		final StreamElementQueueEntry<OUT> head = queue.poll();

// OrderedStreamElementQueue.class第95行
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
	if (queue.size() < capacity) {
		StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);


		LOG.debug("Put element into ordered stream element queue. New filling degree " +
			"({}/{}).", queue.size(), capacity);

		return Optional.of(queueEntry);
	} else {
		LOG.debug("Failed to put element into ordered stream element queue because it " +
			"was full ({}/{}).", queue.size(), capacity);

		return Optional.empty();

private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {
	if (streamElement.isRecord()) {
		return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
	if (streamElement.isWatermark()) {
		return new WatermarkQueueEntry<>((Watermark) streamElement);
	throw new UnsupportedOperationException("Cannot enqueue " + streamElement);


结果无序Aysnc I/O算子相关Watermark处理逻辑
// UnorderedStreamElementQueue.class第140行
public boolean hasCompletedElements() {
	return !this.segments.isEmpty() && this.segments.getFirst().hasCompleted();

public void emitCompletedElement(TimestampedCollector<OUT> output) {
	if (segments.isEmpty()) {
	final Segment currentSegment = segments.getFirst();
	numberOfEntries -= currentSegment.emitCompleted(output);

	// remove any segment if there are further segments, if not leave it as an optimization even if empty
	if (segments.size() > 1 && currentSegment.isEmpty()) {

// UnorderedStreamElementQueue.class第121行
private StreamElementQueueEntry<OUT> addWatermark(Watermark watermark) {
	Segment<OUT> watermarkSegment;
	if (!segments.isEmpty() && segments.getLast().isEmpty()) {
		// reuse already existing segment if possible (completely drained) or the new segment added at the end of
		// this method for two succeeding watermarks
		watermarkSegment = segments.getLast();
	} else {
		watermarkSegment = addSegment(1);

	StreamElementQueueEntry<OUT> watermarkEntry = new WatermarkQueueEntry<>(watermark);

	// add a new segment for actual elements
	return watermarkEntry;


  1. Timely Stream Processing
  2. Streaming 102: The world beyond batch
  3. 流式计算系统系列(2):时间