相关资源
Flink SQL
-- 1)注册 Paimon 源
CREATE CATALOG paimon_hive
WITH
(
'type' = 'paimon',
'warehouse' = 'hdfs://xxxxx/paimon',
'metastore' = 'hive',
'hive-conf-dir' = '/xxxxx/conf',
'uri' = 'thrift://域名1:9083,thrift://域名2:9083'
);
-- 2)声明 Kafka 源
create table kafkaSource (
`_timestamp` string,
`name` string,
`age` string,
`id` string
) with (
'connector' = 'kafka',
'format' = 'json',
'topic' = 'topic1234',
'properties.bootstrap.servers' = '你的Kafka Brokers',
'properties.group.id' = 'kafka-to-paimon',
'scan.startup.mode' = 'latest-offset'
);
-- 3)读取+写入Paimon
INSERT INTO paimon_hive.paimon.odm_kafka_log
SELECT
name AS `name`,
age AS `age`,
id AS `id`
FROM_UNIXTIME(CAST(CAST(`_timestamp` AS BIGINT) / 1000 AS BIGINT), 'yyyyMMdd') AS `day`
FROM kafkaSource;
Flink Table (Java)
Maven依赖
<!-- 添加Flink依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.0</version>
</dependency>
<!-- flink table相关类 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.15.0</version>
</dependency>
<!-- 添加Paimon依赖-->
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-1.15</artifactId>
<version>0.5.0-incubating</version>
</dependency>
Job类
package job;
import com.google.protobuf.ByteString;
import function.GalaxyToPaimonFlatMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
/**
* @Author zhangjinke
* @Create 2023/12/25 17:02
* @Description 将银河PB格式日志写入到Paimon
* @Wiki -
* @Modifier -
* @ModificationTime -
* @Node -
*/
public class GalaxyToPaimonJob {
private static final Logger LOG = LoggerFactory.getLogger(GalaxyToPaimonJob.class);
private static final String GROUP_ID = "job.GalaxyToPaimonJob";
public static void main(String[] args) {
try {
ParameterTool tool = ParameterTool.fromArgs(args);
int source = tool.getInt("source");
int flatmap = tool.getInt("flatmap");
// Kafka consumer
Properties galaxyPro = new Properties();
properties.setProperty("bootstrap.servers", bootstrap_servers);
properties.setProperty("group.id", groupId);
// 自动检测topic分区变化时间间隔
properties.put("flink.partition-discovery.interval-millis", "60000");
properties.put("refresh.leader.backoff.ms", 6000);
KafkaSource<ByteString> galaxyKafkaSource = KafkaSource.<ByteString>builder().setTopics(PropertyUtil.get("user_event_etl_topic")).setValueOnlyDeserializer(new ByteStringSchema()).setProperties(galaxyPro).setStartingOffsets(OffsetsInitializer.latest()).build();
/** 1、 创建flink流式执行环境 */
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(120000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(180000L);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().setAutoWatermarkInterval(0);
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(200, 60000 * 2L));
env.setParallelism(32);
/** 2、 添加 用户+事件 Source 源 */
SingleOutputStreamOperator<Row> rsoso = env.fromSource(galaxyKafkaSource, WatermarkStrategy.noWatermarks(), "GalaxyToPaimonSource")
.uid("GalaxyToPaimonSource_Uid")
.name("GalaxyToPaimonSource_Name")
.setParallelism(source)
/** 3、 简单取出字段,下发GalaxyEntity对象 */
.flatMap(new GalaxyToPaimonFlatMap())
.uid("GalaxyToPaimonFlatMapFunction_Uid")
.name("GalaxyToPaimonFlatMapFunction_Name")
.setParallelism(flatmap)
.returns(Types.ROW_NAMED(
new String[]{"realtime", "ip", "session_id", "app_id", "device_uuid""day", "hour"},
Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING));
/** 4、创建flink table执行环境 */
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Schema schema = Schema.newBuilder()
.column("realtime", DataTypes.STRING())
.column("ip", DataTypes.STRING())
.column("session_id", DataTypes.STRING())
.column("app_id", DataTypes.STRING())
.column("device_uuid", DataTypes.STRING())
.column("day", DataTypes.STRING())
.column("hour", DataTypes.STRING())
.build();
/** 5、创建 Paimon catalog */
tableEnv.executeSql("CREATE CATALOG paimon_hive WITH ('type' = 'paimon', 'warehouse'='hdfs://xxxxx/paimon')");
tableEnv.executeSql("USE CATALOG paimon_hive");
/** 6、将流表注册为一个临时视图 */
tableEnv.createTemporaryView("odm_event_realtime_view", rsoso, schema);
/** 7、将数据插入到 Paimon 表中 */
tableEnv.executeSql("INSERT INTO paimon.odm_event_realtime SELECT * FROM odm_event_realtime_view");
env.execute("job.GalaxyToPaimonJob");
} catch (Exception e) {
LOG.error("GalaxyToPaimonJob启动失败!", e);
}
}
}
Function类
package function;
import com.google.protobuf.ByteString;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class GalaxyToPaimonFlatMap extends RichFlatMapFunction<ByteString, Row> {
private static final Logger log = LoggerFactory.getLogger(GalaxyToPaimonFlatMap.class);
private static final DateTimeFormatter inputDateFormat = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
private static final DateTimeFormatter outputDateFormat = DateTimeFormatter.ofPattern("yyyyMMdd");
private static final DateTimeFormatter outputHourFormat = DateTimeFormatter.ofPattern("yyyyMMddHH");
@Override
public void flatMap(ByteString bytes, Collector<Row> out) {
try {
// 创建结果Row
Row row = new Row(86);
// 使用myProtoBufObj对象依次赋值
myProtoBufObjDataToProtoBuf.myProtoBufObj myProtoBufObj = myProtoBufObjDataToProtoBuf.myProtoBufObj.parseFrom(bytes);
String realtime = myProtoBufObj.getRealtime();
row.setField(0, realtime);
row.setField(1, myProtoBufObj.getIp());
row.setField(2, myProtoBufObj.getSessionId());
row.setField(3, myProtoBufObj.getAppId());
row.setField(4, myProtoBufObj.getDeviceUuid());
row.setField(5, LocalDateTime.parse(realtime, inputDateFormat).format(outputDateFormat));
row.setField(6, LocalDateTime.parse(realtime, inputDateFormat).format(outputHourFormat));
// 将 Row 对象输出
out.collect(row);
} catch (Exception e) {
log.error("function.GalaxyToPaimonFlatMap error is: ", e);
}
}
}
Comments 1 条评论
博主 zjk
Flink Table方法写Paimon时,任务启动可能会出现jar冲突的问题,详见解决方法:https://help.aliyun.com/zh/flink/support/common-sql-errors#section-ewf-3ce-f25