Doris的Routine Load操作

作者:Administrator 发布时间: 2026-02-24 阅读量:5 评论数:0
CREATE ROUTINE LOAD [数据库名].[任务名] ON [目标表]
COLUMNS (列1, 列2, ...)  -- 指定映射字段(可选)
PROPERTIES
(
	"format" = "json",
    "desired_concurrent_number" = "3",  -- 并发数(建议≤Kafka分区数)
    "max_batch_interval" = "20",        -- 单次导入间隔(秒)
    "max_batch_rows" = "200000",        -- 单批次最大行数
    "max_batch_size" = "209715200"      -- 单批次大小(字节)
)
FROM KAFKA
(
    "kafka_broker_list" = "kafka1:9092,kafka2:9092",  -- Kafka集群地址
    "kafka_topic" = "your_topic",                     -- Topic名称
    "property.group.id" = "doris_consumer_group",      -- 消费者组
    "property.security.protocol" = "SASL_PLAINTEXT",   -- 安全协议(可选)
    "property.sasl.mechanism" = "PLAIN",
    "property.sasl.username" = "admin",
    "property.sasl.password" = "password"
)

监控任务状态

-- 查看所有 Routine Load 任务
SHOW ROUTINE LOAD;

-- 查看指定任务详情(包括消费进度、错误信息)
SHOW ROUTINE LOAD FOR [任务名]\G

-- 查看ods库下面的所有任务
use ods;
SHOW ALL ROUTINE LOAD

管理任务

-- 暂停任务
PAUSE ROUTINE LOAD FOR [任务名];

-- 恢复任务
RESUME ROUTINE LOAD FOR [任务名];

-- 删除任务
STOP ROUTINE LOAD FOR [任务名];
create routine load ods.sync_bs_app_intercept_request_data_kafka on ods_bs_app_intercept_request_data_rl  
columns(imei, app_package, version, model, ip, province, city, district, platform, standard_package, intercept_level, occur_date, week_date, month_date, create_time)  
properties(  
    "format" = "json"  
)FROM KAFKA  
(  
    "kafka_broker_list" = "rom-kafka-0:9092,rom-kafka-1:9092,rom-kafka-2:9092",  
    "kafka_topic" = "app_intercept_request_data",  
    "property.group.id" = "doris_consumer_group",  
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"  
);


RESUME ROUTINE LOAD FOR ods.sync_bs_app_intercept_request_data_kafka;  
  
STOP ROUTINE LOAD FOR ods.sync_bs_app_intercept_request_data_kafka;  
  
PAUSE ROUTINE LOAD FOR ods.sync_bs_app_intercept_request_data_kafka;

CREATE ROUTINE LOAD testdb.example_routine_load_json ON test_routineload_tbl
COLUMNS(user_id,name,age)
PROPERTIES(
    "format"="json",
    "jsonpaths"="[\"$.user_id\",\"$.name\",\"$.age\"]"
)
FROM KAFKA(
    "kafka_broker_list" = "192.168.88.62:9092",
    "kafka_topic" = "test-routine-load-json",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

{
	"lv1": "inspect",
	"lv2": "competitor",
	"lv3": "",
	"mt": 1752998857000,
	"task_id": "1125072005390201010797",
	"site_no": "01010797",
	"competitor_feedback": "",
	"competitor_proof": "manager/change/20250720/01010797/691272.jpg",
	"competitor_type": "无",
	"price_diff": "未知"
}

CREATE TABLE test.ods_quna_inspect_competitor_last_rl
(
    `site_no` varchar(255) NULL COMMENT '机器编号',
    `status` string comment '审核状态字段(审核通过或待审核的:枚举值)',
    `mt`      datetime     NULL COMMENT '更新时间',
    `dt`      date         NULL COMMENT '日期'
) ENGINE = OLAP UNIQUE KEY(`site_no`)
COMMENT '巡检任务 竞品反馈信息 最新一条信息 实时同步'
DISTRIBUTED BY HASH(`site_no`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

CREATE
ROUTINE LOAD
test.ods_quna_inspect_competitor_last_rl ON ods_quna_inspect_competitor_last_rl
        COLUMNS(site_no, `status`, t_mt,mt=FROM_UNIXTIME(t_mt/1000,'yyyy-MM-dd HH:mm:ss'), dt=FROM_UNIXTIME(t_mt/1000,'yyyy-MM-dd'))
        PROPERTIES(
            "format"="json",
            c
        )
        FROM KAFKA
        (
            "kafka_broker_list" = "qzsh-storenode01:9092,qzsh-storenode02:9092,qzsh-storenode03:9092",
            "kafka_topic" = "ods_quna_inspect_competitor_kafka",
            "property.group.id" = "doris_consumer_group_20250730",
            "property.kafka_default_offsets" = "OFFSET_BEGINNING"
        );

评论