运行Flink作业报错:Job execution failed.,flink jobgraph

运行Flink作业报错:Job execution failed.项目场景:
一个将本地文本文件写入到MySQL的Flink作业 问题描述
运行作业时报错:Exception in thread \”main\”

项目场景:

Flink作业将本地文本文件写入MySQL

问题描述

运行作业时出错:线程“main”中出现异常org.apache.flink.runtime.client.JobExecutionException: 作业执行失败。

由于错误而无法完成Flink 作业

错误信息:

线程“main”中出现异常org.apache.flink.runtime.client.JobExecutionException: 作业执行失败。

在org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)

在org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)

在java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)

在java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

在java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

在java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)

在org.apache.flink.runtime.rpc.akka.AkkaInvocalHandler.lambda$invokeRpc$0(AkkaInvocalHandler.java:237)

在java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

在java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

在java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

在java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)

在org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)

在akka.dispatch.OnComplete.internal(Future.scala:264)

在akka.dispatch.OnComplete.internal(Future.scala:261)

在akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)

在akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)

在scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

在org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)

在scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)

在scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)

在akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)

在akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)

在akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)

在scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)

在scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)

在scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

在akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)

在akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)

在akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)

在akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)

与scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)

在akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)

在akka.dispatch.TaskInitation.run(AbstractDispatcher.scala:40)

在akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)

在akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

在akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

在akka.dispatch.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979)

在akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

原因: org.apache.flink.runtime.JobException: 恢复被NoRestartBackoffTimeStrategy 抑制

在org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)

在org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)

在org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)

在org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)

在org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)

在org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)

在org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)

在org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)

sun.reflect.NativeMethodAccessorImpl.invoke0(原生方法)

在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

在java.lang.reflect.Method.invoke(Method.java:498)

在org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInitation(AkkaRpcActor.java:305)

在org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

在org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

在org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

在akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

在akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

在scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

在akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

与scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

与scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

与scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

在akka.actor.Actor$class.aroundReceive(Actor.scala:517)

在akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

在akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

在akka.actor.ActorCell.invoke(ActorCell.scala:561)

在akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

在akka.dispatch.Mailbox.run(Mailbox.scala:225)

在akka.dispatch.Mailbox.exec(Mailbox.scala:235)

.还有4 项任务

原因: java.io.IOException: 无法打开JDBC 编写器

在org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:56)

在org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:115)

在org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)

在org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)

在org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

在org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)

在org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)

在org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574)

在org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)

在org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554)

在org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)

在org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)

在java.lang.Thread.run(Thread.java:748)

原因: com.mysql.cj.jdbc.Exception.CommunicationsException: 通信链路故障

上次成功将数据包发送到服务器的时间是0 毫秒前。驱动程序未从服务器接收数据包。

在com.mysql.cj.jdbc.Exceptions.SQLError.createCommunicationsException(SQLError.java:174)

在com.mysql.cj.jdbc.Exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)

在com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828)

在com.mysql.cj.jdbc.ConnectionImpl.init(ConnectionImpl.java:448)

在com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:241)

在com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)

在org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.get或ExplainConnection(SimpleJdbcConnectionProvider.java:121)

在org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54)

.12 更多

原因: com.mysql.cj.Exceptions.CJCommunicationsException: 通信链路故障

上次成功将数据包发送到服务器的时间是0 毫秒前。驱动程序未从服务器接收数据包。

sun.reflect.NativeConstructorAccessorImpl.newInstance0(本机方法)

在sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

在sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

在java.lang.reflect.Constructor.newInstance(Constructor.java:423)

在com.mysql.cj.Exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)

在com.mysql.cj.Exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)

在com.mysql.cj.Exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)

在com.mysql.cj.Exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)

在com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:378)

在com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:205)

在com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1433)

在com.mysql.cj.NativeSession.connect(NativeSession.java:133)

在com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:948)

在com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:818)

.17 更多

原因: javax.net.ssl.SSLException: 不支持的记录版本Unknown-0.0

在sun.security.ssl.InputRecord.checkRecordVersion(InputRecord.java:552)

在sun.security.ssl.InputRecord.readV3Record(InputRecord.java:565)

在sun.security.ssl.InputRecord.read(InputRecord.java:529)

在sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)

在sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)

在sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)

在sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)

在com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347)

在com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:191)

在com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101)

在com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:369)

.22 更多

进程退出,退出代码为1

############################################## ## ## # ################################

原始代码:

com.lcvc 包

导入org.apache.flink.api.scala._

导入org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

导入org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}

导入java.sql.PreparedStatement

案例类采购(customerId: Int,productId: Int,quantity: Int)

对象接收器到Mysql {

def main(args: 数组[字符串]): 单位={

val env=StreamExecutionEnvironment.getExecutionEnvironment

//加载数据源

val input=env.readTextFile(\’输入/购买.log\’)

val 购买=input.map { line=

val 字段=line.split(\’,\’)

购买(字段(0).toInt,字段(1).toInt,字段(2).toInt)

}

//创建一个JDBC接收器

val 接收器=JdbcSink.sink(

\’插入客户(customerId,productId,total)VALUES(?)重复键更新总计=总计+ VALUES(总计)\’,

新建JdbcStatementBuilder[购买]{

覆盖def 接受(t: 准备语句,u: 购买)={

t.setInt(1,u.customerId)

t.setInt(2,u.productId)

t.setInt(3,u.数量)

}

},

新的JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl(\’jdbc:mysql://master:3306/eco\’)

.withDriverName(\’com.mysql.cj.jdbc.Driver\’)

.withUsername(\’root\’)

.withPassword(\’密码123456$\’)

。建造()

//将作业发送到接收器

购买.addSink(Sink)

//启动flink作业

环境.execute()

}

}

原因分析:

线程“main”中出现异常org.apache.flink.runtime.client.JobExecutionException: 作业执行失败。

Flink运行作业时抛出异常。这通常意味着运行Flink 作业时发生了一些错误,作业无法成功完成。

原因: org.apache.flink.runtime.JobException: 恢复被NoRestartBackoffTimeStrategy 抑制

Flink 在运行作业时遇到错误。配置了NoRestartBackoffTimeStrategy,因此不会尝试重新启动作业。

原因: java.io.IOException: 无法打开JDBC 编写器

Java IO 异常,指示尝试打开JDBC 编写器时出现问题。这通常意味着程序在连接数据库或执行写入操作时遇到问题。

原因: com.mysql.cj.jdbc.Exception.CommunicationsException: 通信链路故障

与MySQL 数据库通信失败

上次成功将数据包发送到服务器的时间是0 毫秒前。驱动程序未从服务器接收数据包。

最后一个数据包已成功发送到服务器。这发生在0 毫秒前。

JDBC 驱动程序无法从服务器接收响应数据包

原因: javax.net.ssl.SSLException: 不支持的记录版本Unknown-0.0

使用SSL 进行安全连接时,客户端和服务器之间的SSL 协议版本不兼容

根据错误报告,问题是Flink作业与MySQL数据库的连接问题。

数据库版本是5.7.18,但pom.xml文件显示8.0.31。

代码中使用的驱动类为com.mysql.cj.jdbc.Driver,以兼容pom文件中的版本。

MySQL 8.0及更早版本使用com.mysql.jdbc.Driver,由于兼容性问题无法连接数据库。

解决方案:

修改pom.xml文件:

使用与集群数据库相同或相似版本的依赖项

具体类代码改动:

useSSL=false:禁用SSL 连接以避免与SSL 相关的警告。

修改后的源码:

com.lcvc 包

导入org.apache.flink.api.scala._

导入org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

导入org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}

导入java.sql.PreparedStatement

案例类采购(customerId: Int,productId: Int,quantity: Int)

对象接收器到Mysql {

def main(args: 数组[字符串]): 单位={

val env=StreamExecutionEnvironment.getExecutionEnvironment

//加载数据源

val input=env.readTextFile(\’输入/购买.log\’)

val 购买=input.map { line=

val 字段=line.split(\’,\’)

购买(字段(0).toInt,字段(1).toInt,字段(2).toInt)

}

//创建一个JDBC接收器

val 接收器=JdbcSink.sink(

\’插入客户(customerId,productId,total)VALUES(?)重复键更新总计=总计+ VALUES(总计)\’,

新建JdbcStatementBuilder[购买]{

覆盖def 接受(t: 准备语句,u: 购买)={

t.setInt(1,u.customerId)

t.setInt(2,u.productId)

t.setInt(3,u.数量)

}

},

新的JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl(\’jdbc:mysql://master:3306/eco?useSSL=false\’)

//.withDriverName(\’com.mysql.cj.jdbc.Driver\’)

.withDriverName(\’com.mysql.jdbc.Driver\’)

.withUsername(\’root\’)

.withPassword(\’密码123$\’)

。建造()

//将作业发送到接收器

购买.addSink(Sink)

//启动flink作业

环境.execute()

}

}

问题解决了

以上运行#Flink 作业错误:源网络的相关内容仅供参考。相关信息请参见官方公告。

原创文章,作者:CSDN,如若转载,请注明出处:https://www.sudun.com/ask/93372.html

Like (0)
CSDN的头像CSDN
Previous 2024年7月6日
Next 2024年7月6日

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注