从零开始学流计算-Structured Streaming + Kafka Integration

Structured Streaming + Kafka Integration 跑一个Demo

参照官方的 spark.apache.org/docs/latest…

启动kafka

//官方下载kafka和zookeeper,并解压...

//kafka启动前需要启动zk
$ bin/zookeeper-server-start.sh config/zookeeper.properties

//启动kafka,默认本机9092
$ bin/kafka-server-start.sh config/server.properties

//使用kafka自带脚本创建一个 名字叫quickstart-events的 topic
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

//启动一个生产者,这样就可以在shell中给topic发消息
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

复制代码

编写流计算代码,并在IDEA中跑起来

参考了org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount

1.新建一个Java Maven工程

2.pom加入依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>
        
        //可选 本地debug输出日志需要这个
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.7</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.7</version>
        </dependency>

复制代码

3.写一个main方法,并且Run…

public static void main(String[] args) throws Exception {

    SparkConf config = new SparkConf().setAppName("JavaStructuredKafkaWordCount").setMaster("local[1]");

    SparkContext sparkContext = new SparkContext(config);

    SparkSession spark = new  SparkSession(sparkContext);

    // Create DataSet representing the stream of input lines from kafka
    Dataset<String> lines = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "127.0.0.1:9092")
            .option("subscribe", "quickstart-events")
            .load()
            .selectExpr("CAST(value AS STRING)")
            .as(Encoders.STRING());

    // Generate running word count
    Dataset<org.apache.spark.sql.Row> wordCounts = lines.flatMap(
            (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
            Encoders.STRING()).groupBy("value").count();

    // Start running the query that prints the running counts to the console
    StreamingQuery query = wordCounts.writeStream()
            .outputMode("complete")
            .format("console")
            .start();

    query.awaitTermination();

}
复制代码

4.通过Step1中的Kafka producer 随便打几个单词…

> hello a
> hello b
> ccc
复制代码

5.可以通过IDEA控制台查看每个单词出现的次数…

Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|   hel|    1|
|  owen|    3|
|uewior|    1|
| owen2|    1|
|    wu|    1|
+------+-----+

复制代码

将代码打包交给Spark集群来运行(生产环境一般是这种)

pom中加入打fat-jar的插件

             <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.2-beta-5</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                        <configuration>
                            <archive>
                                <manifest>
                                    <mainClass>
                                        xxx.xxx.client.SparkApplication
                                    </mainClass>
                                </manifest>
                            </archive>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
复制代码

maven package…

在spark官网下载spark 3.xx版本解压,在目录下执行

./bin/spark-submit \
 --class xxx.xxx.client.SparkApplication \
 --master 'local[2]' \
 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \
 /Users/xxx.xxx-latest-SNAPSHOT-jar-with-dependencies.jar \
 1000 >> ~/Desktop/xxxxxxx.temp


复制代码

然后去 ~/Desktop/xxxxxxx.temp 这个文件中查看实时计算结果

介绍

image.png
Structured Streaming 和 Spark Streaming是什么关系呢?
1.编程模型不同 2.Structured Streaming是 2.x版本新功能,未来会取代Spark Streaming
3.如上图,Structured Streaming是Spark SQL的一部分。

structured-streaming-programming-guide

Source ——> Structured Streaming ——> Sink

内置的Source有 File、Kafka、Socket、Rate Source。image.png

如何将自研的mq集成为 Streaming的Source呢?

1.将mq中的消费到的 数据,transfer到 Socket中,交由Streaming来处理。但是这样的方式问题很多,无法实现容错,无法管理offset,重启将导致消息丢失
2.正经做法还是自定义一个Source,比如RocketMQ Source这样的。

……TODO

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享