项目场景:
Flink作业将本地文本文件写入MySQL
问题描述
运行作业时出错:线程“main”中出现异常org.apache.flink.runtime.client.JobExecutionException: 作业执行失败。
由于错误而无法完成Flink 作业
错误信息:
线程“main”中出现异常org.apache.flink.runtime.client.JobExecutionException: 作业执行失败。
在org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
在org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
在java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
在java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
在java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
在java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
在org.apache.flink.runtime.rpc.akka.AkkaInvocalHandler.lambda$invokeRpc$0(AkkaInvocalHandler.java:237)
在java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
在java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
在java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
在java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
在org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
在akka.dispatch.OnComplete.internal(Future.scala:264)
在akka.dispatch.OnComplete.internal(Future.scala:261)
在akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
在akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
在scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
在org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
在scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
在scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
在akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
在akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
在akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
在scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
在scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
在scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
在akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
在akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
在akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
在akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
与scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
在akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
在akka.dispatch.TaskInitation.run(AbstractDispatcher.scala:40)
在akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
在akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
在akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
在akka.dispatch.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979)
在akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
原因: org.apache.flink.runtime.JobException: 恢复被NoRestartBackoffTimeStrategy 抑制
在org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
在org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
在org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
在org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
在org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
在org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
在org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
在org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
sun.reflect.NativeMethodAccessorImpl.invoke0(原生方法)
在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在java.lang.reflect.Method.invoke(Method.java:498)
在org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInitation(AkkaRpcActor.java:305)
在org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
在org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
在org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
在akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
在akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
在scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
在akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
与scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
与scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
与scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
在akka.actor.Actor$class.aroundReceive(Actor.scala:517)
在akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
在akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
在akka.actor.ActorCell.invoke(ActorCell.scala:561)
在akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
在akka.dispatch.Mailbox.run(Mailbox.scala:225)
在akka.dispatch.Mailbox.exec(Mailbox.scala:235)
.还有4 项任务
原因: java.io.IOException: 无法打开JDBC 编写器
在org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:56)
在org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:115)
在org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)
在org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
在org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
在org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
在org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
在org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574)
在org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
在org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554)
在org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
在org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
在java.lang.Thread.run(Thread.java:748)
原因: com.mysql.cj.jdbc.Exception.CommunicationsException: 通信链路故障
上次成功将数据包发送到服务器的时间是0 毫秒前。驱动程序未从服务器接收数据包。
在com.mysql.cj.jdbc.Exceptions.SQLError.createCommunicationsException(SQLError.java:174)
在com.mysql.cj.jdbc.Exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
在com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828)
在com.mysql.cj.jdbc.ConnectionImpl.init(ConnectionImpl.java:448)
在com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:241)
在com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
在org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.get或ExplainConnection(SimpleJdbcConnectionProvider.java:121)
在org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54)
.12 更多
原因: com.mysql.cj.Exceptions.CJCommunicationsException: 通信链路故障
上次成功将数据包发送到服务器的时间是0 毫秒前。驱动程序未从服务器接收数据包。
sun.reflect.NativeConstructorAccessorImpl.newInstance0(本机方法)
在sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
在sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
在java.lang.reflect.Constructor.newInstance(Constructor.java:423)
在com.mysql.cj.Exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
在com.mysql.cj.Exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
在com.mysql.cj.Exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
在com.mysql.cj.Exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)
在com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:378)
在com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:205)
在com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1433)
在com.mysql.cj.NativeSession.connect(NativeSession.java:133)
在com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:948)
在com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:818)
.17 更多
原因: javax.net.ssl.SSLException: 不支持的记录版本Unknown-0.0
在sun.security.ssl.InputRecord.checkRecordVersion(InputRecord.java:552)
在sun.security.ssl.InputRecord.readV3Record(InputRecord.java:565)
在sun.security.ssl.InputRecord.read(InputRecord.java:529)
在sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
在sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
在sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
在sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
在com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347)
在com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:191)
在com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101)
在com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:369)
.22 更多
进程退出,退出代码为1
############################################## ## ## # ################################
原始代码:
com.lcvc 包
导入org.apache.flink.api.scala._
导入org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
导入org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
导入java.sql.PreparedStatement
案例类采购(customerId: Int,productId: Int,quantity: Int)
对象接收器到Mysql {
def main(args: 数组[字符串]): 单位={
val env=StreamExecutionEnvironment.getExecutionEnvironment
//加载数据源
val input=env.readTextFile(\’输入/购买.log\’)
val 购买=input.map { line=
val 字段=line.split(\’,\’)
购买(字段(0).toInt,字段(1).toInt,字段(2).toInt)
}
//创建一个JDBC接收器
val 接收器=JdbcSink.sink(
\’插入客户(customerId,productId,total)VALUES(?)重复键更新总计=总计+ VALUES(总计)\’,
新建JdbcStatementBuilder[购买]{
覆盖def 接受(t: 准备语句,u: 购买)={
t.setInt(1,u.customerId)
t.setInt(2,u.productId)
t.setInt(3,u.数量)
}
},
新的JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(\’jdbc:mysql://master:3306/eco\’)
.withDriverName(\’com.mysql.cj.jdbc.Driver\’)
.withUsername(\’root\’)
.withPassword(\’密码123456$\’)
。建造()
)
//将作业发送到接收器
购买.addSink(Sink)
//启动flink作业
环境.execute()
}
}
原因分析:
线程“main”中出现异常org.apache.flink.runtime.client.JobExecutionException: 作业执行失败。
Flink运行作业时抛出异常。这通常意味着运行Flink 作业时发生了一些错误,作业无法成功完成。
原因: org.apache.flink.runtime.JobException: 恢复被NoRestartBackoffTimeStrategy 抑制
Flink 在运行作业时遇到错误。配置了NoRestartBackoffTimeStrategy,因此不会尝试重新启动作业。
原因: java.io.IOException: 无法打开JDBC 编写器
Java IO 异常,指示尝试打开JDBC 编写器时出现问题。这通常意味着程序在连接数据库或执行写入操作时遇到问题。
原因: com.mysql.cj.jdbc.Exception.CommunicationsException: 通信链路故障
与MySQL 数据库通信失败
上次成功将数据包发送到服务器的时间是0 毫秒前。驱动程序未从服务器接收数据包。
最后一个数据包已成功发送到服务器。这发生在0 毫秒前。
JDBC 驱动程序无法从服务器接收响应数据包
原因: javax.net.ssl.SSLException: 不支持的记录版本Unknown-0.0
使用SSL 进行安全连接时,客户端和服务器之间的SSL 协议版本不兼容
根据错误报告,问题是Flink作业与MySQL数据库的连接问题。
数据库版本是5.7.18,但pom.xml文件显示8.0.31。
代码中使用的驱动类为com.mysql.cj.jdbc.Driver,以兼容pom文件中的版本。
MySQL 8.0及更早版本使用com.mysql.jdbc.Driver,由于兼容性问题无法连接数据库。
解决方案:
修改pom.xml文件:
使用与集群数据库相同或相似版本的依赖项
具体类代码改动:
useSSL=false:禁用SSL 连接以避免与SSL 相关的警告。
修改后的源码:
com.lcvc 包
导入org.apache.flink.api.scala._
导入org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
导入org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
导入java.sql.PreparedStatement
案例类采购(customerId: Int,productId: Int,quantity: Int)
对象接收器到Mysql {
def main(args: 数组[字符串]): 单位={
val env=StreamExecutionEnvironment.getExecutionEnvironment
//加载数据源
val input=env.readTextFile(\’输入/购买.log\’)
val 购买=input.map { line=
val 字段=line.split(\’,\’)
购买(字段(0).toInt,字段(1).toInt,字段(2).toInt)
}
//创建一个JDBC接收器
val 接收器=JdbcSink.sink(
\’插入客户(customerId,productId,total)VALUES(?)重复键更新总计=总计+ VALUES(总计)\’,
新建JdbcStatementBuilder[购买]{
覆盖def 接受(t: 准备语句,u: 购买)={
t.setInt(1,u.customerId)
t.setInt(2,u.productId)
t.setInt(3,u.数量)
}
},
新的JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(\’jdbc:mysql://master:3306/eco?useSSL=false\’)
//.withDriverName(\’com.mysql.cj.jdbc.Driver\’)
.withDriverName(\’com.mysql.jdbc.Driver\’)
.withUsername(\’root\’)
.withPassword(\’密码123$\’)
。建造()
)
//将作业发送到接收器
购买.addSink(Sink)
//启动flink作业
环境.execute()
}
}
问题解决了
以上运行#Flink 作业错误:源网络的相关内容仅供参考。相关信息请参见官方公告。
原创文章,作者:CSDN,如若转载,请注明出处:https://www.sudun.com/ask/93372.html