package org.apache.flink.runtime.operators.coordination;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.class */
class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
    private static final String EVENT_LOSS_ERROR_MESSAGE = "An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency. Event: '%s', targetTask: %s";
    private final SubtaskAccess subtaskAccess;
    private final EventSender sender;
    private final Executor sendingExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubtaskGatewayImpl(SubtaskAccess subtaskAccess, EventSender eventSender, Executor executor) {
        this.subtaskAccess = subtaskAccess;
        this.sender = eventSender;
        this.sendingExecutor = executor;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway
    public CompletableFuture<Acknowledge> sendEvent(OperatorEvent operatorEvent) {
        if (!isReady()) {
            throw new FlinkRuntimeException("SubtaskGateway is not ready, task not yet running.");
        }
        try {
            Callable<CompletableFuture<Acknowledge>> createEventSendAction = this.subtaskAccess.createEventSendAction(new SerializedValue<>(operatorEvent));
            CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
            FutureUtils.assertNoException(completableFuture.handleAsync((acknowledge, th) -> {
                if (th == null || !this.subtaskAccess.isStillRunning()) {
                    return null;
                }
                this.subtaskAccess.triggerTaskFailover(new FlinkException(String.format(EVENT_LOSS_ERROR_MESSAGE, operatorEvent, this.subtaskAccess.subtaskName()), th));
                return null;
            }, this.sendingExecutor));
            this.sendingExecutor.execute(() -> {
                this.sender.sendEvent(createEventSendAction, completableFuture);
            });
            return completableFuture;
        } catch (IOException e) {
            throw new FlinkRuntimeException("Cannot serialize operator event", e);
        }
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway
    public ExecutionAttemptID getExecution() {
        return this.subtaskAccess.currentAttempt();
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway
    public int getSubtask() {
        return this.subtaskAccess.getSubtaskIndex();
    }

    private boolean isReady() {
        return this.subtaskAccess.hasSwitchedToRunning().isDone();
    }
}
