Structured Streaming + Kafka Integration 跑一个Demo
参照官方的 spark.apache.org/docs/latest…
启动kafka
//官方下载kafka和zookeeper,并解压...
//kafka启动前需要启动zk
$ bin/zookeeper-server-start.sh config/zookeeper.properties
//启动kafka,默认本机9092
$ bin/kafka-server-start.sh config/server.properties
//使用kafka自带脚本创建一个 名字叫quickstart-events的 topic
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
//启动一个生产者,这样就可以在shell中给topic发消息
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
复制代码
编写流计算代码,并在IDEA中跑起来
参考了org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount
1.新建一个Java Maven工程
2.pom加入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
//可选 本地debug输出日志需要这个
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.7</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.7</version>
</dependency>
复制代码
3.写一个main方法,并且Run…
public static void main(String[] args) throws Exception {
SparkConf config = new SparkConf().setAppName("JavaStructuredKafkaWordCount").setMaster("local[1]");
SparkContext sparkContext = new SparkContext(config);
SparkSession spark = new SparkSession(sparkContext);
// Create DataSet representing the stream of input lines from kafka
Dataset<String> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "quickstart-events")
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
// Generate running word count
Dataset<org.apache.spark.sql.Row> wordCounts = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
Encoders.STRING()).groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
}
复制代码
4.通过Step1中的Kafka producer 随便打几个单词…
> hello a
> hello b
> ccc
复制代码
5.可以通过IDEA控制台查看每个单词出现的次数…
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| hel| 1|
| owen| 3|
|uewior| 1|
| owen2| 1|
| wu| 1|
+------+-----+
复制代码
将代码打包交给Spark集群来运行(生产环境一般是这种)
pom中加入打fat-jar的插件
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-5</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>
xxx.xxx.client.SparkApplication
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
复制代码
maven package…
在spark官网下载spark 3.xx版本解压,在目录下执行
./bin/spark-submit \
--class xxx.xxx.client.SparkApplication \
--master 'local[2]' \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \
/Users/xxx.xxx-latest-SNAPSHOT-jar-with-dependencies.jar \
1000 >> ~/Desktop/xxxxxxx.temp
复制代码
然后去 ~/Desktop/xxxxxxx.temp 这个文件中查看实时计算结果
介绍
Structured Streaming 和 Spark Streaming是什么关系呢?
1.编程模型不同 2.Structured Streaming是 2.x版本新功能,未来会取代Spark Streaming
3.如上图,Structured Streaming是Spark SQL的一部分。
structured-streaming-programming-guide
Source ——> Structured Streaming ——> Sink
内置的Source有 File、Kafka、Socket、Rate Source。
如何将自研的mq集成为 Streaming的Source呢?
1.将mq中的消费到的 数据,transfer到 Socket中,交由Streaming来处理。但是这样的方式问题很多,无法实现容错,无法管理offset,重启将导致消息丢失
2.正经做法还是自定义一个Source,比如RocketMQ Source这样的。
……TODO
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END