解决 ERROR MicroBatchExecution: Query [] terminated with error net.ConnectException Connection refused
目录
问题现象
解决方式
问题现象
在实验spark的实时流式处理发现如下错误,具体错误信息如下。
# 具体错误信息
ERROR MicroBatchExecution: Query [id = , runId = ] terminated with error
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:666666666)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:666666666)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:666666666)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:666666666)
at java.net.Socket.connect(Socket.java:666666666)
at java.net.Socket.connect(Socket.java:666666666)
at java.net.Socket.<init>(Socket.java:666666666)
at java.net.Socket.<init>(Socket.java:666666666)
at org.apache.spark.sql.execution.streaming.TextSocketSource.initialize(socket.scala:666666)
# 以下应该是大家正在实验的WordCount的demo,但是这个demo有些欠妥的地方,没有说清楚!
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
# 创建一个监听 localhost:9999的流dataframe
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# 将lines打散为单个单词
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# 统计每个单词的出现次数
wordCounts = words.groupBy("word").count()
# 输出结果到终端标准输出
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
解决方式
原因是你没有在启动程序前,先把端口打开!
所以你应该在打开程序之前,先把端口打开,命令如下:
nc -lk 9999
然后就可以了哦~
还没有评论,来说两句吧...