苏州 营销型网站 高端网站,百度推广助手电脑版,如何设置自己的网站,wordpress评论可见前言
大数据应用开发——实时数据采集
大数据应用开发——实时数据处理 Flink完成Kafka中的数据消费#xff0c;将数据分发至Kafka的dwd层中 并在HBase中进行备份
大数据应用开发——数据可视化
hadoop#xff0c;zookeeper#xff0c;kafka#xff0c;flink要开启
目…前言
大数据应用开发——实时数据采集
大数据应用开发——实时数据处理 Flink完成Kafka中的数据消费将数据分发至Kafka的dwd层中 并在HBase中进行备份
大数据应用开发——数据可视化
hadoopzookeeperkafkaflink要开启
目录 题目 Flink完成Kafka中的数据消费将数据分发至Kafka的dwd层中 题目 按照任务书要求使用Java语言基于Flink完成Kafka中的数据消费将数据分发至Kafka的dwd层中并在HBase中进行备份同时建立Hive外表基于Flink完成相关的数据指标计算并将计算结果存入Redis、ClickHouse中 Flink完成Kafka中的数据消费将数据分发至Kafka的dwd层中
在IDEA下用maven创建flink项目
# 用cmd执行创建在当前目录下
# java版本
mvn archetype:generate -DarchetypeGroupIdorg.apache.flink -DarchetypeArtifactIdflink-quickstart-java -DarchetypeVersionflink版本号# scala版本
mvn archetype:generate -DarchetypeGroupIdorg.apache.flink -DarchetypeArtifactIdflink-quickstart-scala -DarchetypeVersionflink版本号 修改pox.xml文件将flink-connector-kafka_...依赖移出来 demo包下有两个.java PS一个用于批处理另一个用于流处理 public class StreamingJob {public static void main(String[] args) throws Exception {// set up the streaming execution environmentfinal StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 配置发送的KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(master:9092).setTopics(order).setGroupId(my_group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();// 配置接收的KafkaSinkString sink KafkaSink.Stringbuilder().setBootstrapServers(master:9092).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(dwd_order).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.NONE).build();// 指定的源创建一个数据流DataStreamString stream env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source);// 将数据里的符号去掉DataStreamString text stream.map(new MapFunctionString, String() {Overridepublic String map(String s) throws Exception {return s.replace(,);}});// 打印处理结果到控制台text.print();// 发送text.sinkTo(sink);// execute programenv.execute(Flink Streaming Java API Skeleton);}
}
将代码打包成.jar可以先clean再package 生成位置在当前项目位置/target/项目名称-...jar 放进主节点
# /usr/flink/bin/flink run -c 包名.运行class名 放在主节点的位置
/usr/flink/bin/flink run -c demo.StreamingJob /opt/flink-java-1.0-SNAPSHOT.jar
最后可以用flink控制台或kafka-console-consumer.sh查看