KAFKA写入生产者和消费者,scala
项目说明
实验项目
目的:flink HA高可用模式下的实时流数据计算展示
架构:flink+kafka+zookeeper+hadoop+web可视化(待定可视化)
数据格式
{“ID”:978,”from”:”福建省”,”to”:”青海省”,”time”:”2021-06-21 14:52:47″,”value”:1458}
id 为数据生成的序号(累加),时间为数据时间(默认为数据生成时间),from、to 为随机生成的省份,value 随机生成值
未完待续中,随机省和市 市还没有弄好+开启flink之路
现在单broker模式 只在master 10.0.20.181上,之后尝试多节点。
流程
在IDEA 创建 maven 项目
在服务器安装 zookeeper和kafka 并启动zk和kafka
在kafka建立topic,打开消费者,启动maven程序。注意监听端口9092
pox.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mn</groupId>
<artifactId>projet_test_juin</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.12.3</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将 Scala 代码编译成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<!-- 声明绑定到 maven 的 compile 阶段 -->
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.test.MainClassName</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
复制代码
maven IDEA flink 和 kafka 依赖,这个程序没有涉及到flink,在写依赖时可以忽略flink的依赖,添加kafka
生产者
package kafka
import com.alibaba.fastjson.JSONObject
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import scala.util.Random
object KafkaProducer {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers","10.0.20.181:9092") // broker集群地址
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer") // 可以序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer") // value 序列化
val rand = new Random()
val max_records = 1000 //1000条数据
val producer = new KafkaProducer[String,String](props) // 创建kafka对象
val startTime = System.currentTimeMillis();// 毫秒时间
for (i <- 1 to max_records) {
val province = List("北京", "天津", "河北省", "山西省", "内蒙古自治区", "辽宁省", "吉林省", "黑龙江省", "上海",
"江苏省", "浙江省", "安徽省", "福建省", "江西省", "山东省", "河南省", "湖北省", "湖南省", "广东省", "广西壮族自治区",
"海南省", "重庆", "四川省", "贵州省", "云南省", "西藏自治区", "陕西省", "甘肃省", "青海省", "宁夏回族自治区",
"新疆维吾尔自治区", "台湾省", "香港特别行政区")
//val city = List("北京", "沈阳", "太原", "银川")
//from 省+市
var from = province(rand.nextInt(33))
//to 省+市
var to = province(rand.nextInt(33))
var value = rand.nextInt(5000)
//time
val time = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss").format(new Date)
//println(i + "," +from + "," + to + "," + time + "," + value)
//转化为json字符串
val message = new JSONObject()
message.put("ID", i);
message.put("from", from);
message.put("to", to);
message.put("time", time);
message.put("value", value);
//println(message)
Thread.sleep(100 + rand.nextInt(100))
val data = new ProducerRecord[String,String]("test_1",message.toString() )
val futureCallback=producer.send(data)
val answer = futureCallback.get()
val topic =answer.topic();
println("messege has sended : "+topic)
}
System.out.println("send per second: "+max_records*1000/(System.currentTimeMillis()-startTime))
producer.close()
}
}
复制代码
消费者
package kafka
import java.time.Duration
import java.util.concurrent._
import java.util.{Collections, Properties}
import scala.collection.JavaConversions._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
/**
* 创建消费消息的实体类
*/
class KafkaProducerConsumer(val brokers:String,
val groupId:String,
val topic:String ) { // 伴生类
val props = createCunsumerConfig(brokers,groupId)
val consumer = new KafkaConsumer[String,String](props)
def shutdown() = { // 关闭函数
if(consumer != null)
consumer.close()
}
// 定义类中的方法
def createCunsumerConfig(brokers: String,groupId: String): Properties ={ // 定义类中的方法
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers) // 设置kafaka集群地址
props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true") // 设置自动提交
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
return props
}
// 定义无返回值的方法
def run()={
consumer.subscribe(Collections.singletonList(this.topic)) // 一个topic主题数组
Executors.newSingleThreadExecutor.execute(new Runnable {
override def run(): Unit = { // 覆盖run方法
while (true) {
val records = consumer.poll(Duration.ofSeconds(1000)) // 1秒钟拉一次
for (record <- records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset())
}
}
}
})
}
}
object KafkaProducerConsumer extends App { // 伴生对象
override def main(args: Array[String]): Unit = {
val brokers = "10.0.20.181:9092"
val groupId ="org.mn"
val topic ="test_1"
val test = new KafkaProducerConsumer(brokers,groupId,topic)
test.run()
}
}
复制代码
推荐参考 学习很多 非常好的博客
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END