全部产品
Search
文档中心

console命令工具

更新时间: 2021-08-02

console命令行工具

操作视频

准备工作

下载命令行工具进行datahub_console.tar.gz解压,在conf/datahub.properties中填写ak和endpoint信息,配置完成后在bin目录执行脚本

注意:

  • jdk>=1.8版本

使用指南

基础操作

  • 查看所有可执行命令
  • help后输入其他命令可查看该命令所需参数,例如:help lt即可查看lt所需要的参数
help
  • 清除屏幕
clear
  • 退出程序
exit
  • 查看命令报错详情,命令报错后可使用stacktrace查看具体错误
stacktrace
  • 运行多命令脚本
script

Project操作

  • 创建Project
cp -p test_project  -c test_comment
  • 删除Project
dp -p test_project
  • 获取Project列表
lp

Topic操作

  • 创建Topic
    • -m参数不同的Topic类型,BLOB代表创建BLOB类型的Topic,Tuple表示创建Tuple类型的Topic
    • Tuple类型 Topic字段格式为[(fieldName,fieldType,isNull)],多个字段以逗号隔开
ct -p test_project -t test_topic -m TUPLE -f [(name,string,true)] -s 3 -l 3 -c test_comment
  • 删除Topic
dt test_project test_topic
  • 获取Topic信息
gt -p test_project -t test_topic
  • 导出Topic schema结构为json文件
gts -f filepath -p test_project -t test_topic
  • 获取Topic列表
lt -p test_project
  • 导入Json文件创建Topic
rtt -s 3 -l 3 -c test_comment -f filepath -p test_project -t test_topic

Connector操作

  • 创建odps connector
    • -m参数表示不同的同步类型,目前同步到odps的有SYSTEM_TIME、USER_DEFINE、EVENT_TIME、META_TIME四种同步类型
    • -tr参数表示分区的时间间隔,console工具默认为60分钟
    • -tf参数 分区格式,ds 表示按天分区,ds hh表示按小时分区,ds hh mm表示按分钟分区
coc -p test_project -t test_topic -m SYSTEM_TIME -e odpsEndpoint -op odpsProject -ot odpsTable -oa odpsAccessId -ok odpsAccessKey -tr 60 -c (field1,field2) -tf ds hh mm
  • 新增connector字段
acf -p test_project -t test_topic -c connectorId -f fieldName
  • 创建同步到mysql rds connector
    • -ty参数表示同步的类型,共有两种
      • mysql表示创建同步到mysql的connector
      • ads 表示创建同步到ads的connector
    • -ht表示插入方式,共有两种
      • IGNORE
      • OVERWRITE
    • -n表示同步的字段,示例:(field1,field2)
cdc -p test_project -t test_topic -h host -po 3306 -ty mysql -d mysql_database -ta msyql_table -u username -pa password -ht IGNORE -n (field1,field2)
  • 创建 datahub connector
    • -m表示认证类型
      • AK表示通过AK认证,需要填写accessId和accessKey
      • STS表示通过STS认证
cdhc -p test_project -t test_topic -sp sourceProject -st sourceTopic -m AK -i accessid k accessKey
  • 创建fc connector
    • -au表示认证方式
      • AK表示通过AK认证,需要填写accessId和accessKey
      • STS表示通过STS认证
    • -n表示同步的字段,例如:(field1,field2)
cfc -p test_project -t test_topic -e endpoint -s service -f function -au AK -i accessId -k accessKey -n (field1,field2)
  • 创建Hologres connector
    • -au表示认证方式,目前同步到holo只支持AK认证
    • -m表示解析类型,选择Delimiter需要指定lineDelimiter、parseData、columnDelimiter属性,选择IngormaticaJson需要指定parseData属性
      • Delimiter
      • InformaticaJson
chc -p test_project -t test_topic -e endpoint -cl (field,field2) -au AK -hp holoProject -ht holoTopic -i accessId -k accessKey -m Delimiter -l 1 -b false -n (field1,field2)
  • 创建ots connector
    • -m表示认证类型,默认使用STS
      • AK表示通过AK认证,需要填写accessId和accessKey
      • STS表示通过STS认证
    • -wm表示写入方式,支持两种写入方式
      • PUT
      • UPDATE
    • -c表示同步的字段,例如:(field1,field2)
cotsc -p test_project -t test_topic -i accessId -k accessKey -it instanceId -m AK -t table -wm PUT -c (field1,field2)
  • 创建 oss connector
csc -p test_project -t test_topic -b bucket -e endpoint -pr ossPrefix -tf ossTimeFormat -tr timeRange -c (f1,f2)
  • 删除connector
    • 可传入多个connectorid,以空格分隔
dc -p test_project -t test_topic -c connectorId
  • 获取connector详情信息
gc -p test_project -t test_topic -c connectorId
  • 获取某个Topic下面的connector列表
lc -p test_project -t test_topic
  • 重启connector
rc -p test_project -t test_topic -c connectorId
  • 更新connector ak
uca -p test_project -t test_topic -c connectorId -ty connectorType -a accessId -k accessKey

shard操作

  • 合并shard
ms -p test_project -t test_topic -s shardId -a adjacentShardId
  • 分裂shard
ss -p test_project -t test_topic -s shardId
  • 获取某个topic下面的所有shard
ls -p test_project -t topicName
  • 获取同步shard的状态
gcs -p test_project -t test_topic -s shardId -c connectorId
  • 获取订阅消费的每个shard点位
gso -p test_project -t test_topic -s subid -i shardId

订阅操作

  • 创建订阅
css -p test_project -t test_topic -c comment
  • 删除订阅
dsc -p test_project -t test_topic -s subId
  • 查询订阅列表
lss -p test_project -t test_topic

上传下载数据

  • 上传数据
    • -f参数表示文件路径,注意:windows路径下请添加转义符,例如:D:\\test\\test.txt
    • -m参数表示文本分隔符,目前支持逗号、空格分隔符
    • -n 参数表示每次上传batchsize大小,默认为1000
uf -f filepath -p test_topic -t test_topic -m "," -n 1000

示例: CSV文件上传

下面以CSV文件为例,说明下如何使用console工具将CSV文件上传到DataHub数据。CSV文件的格式如下图所示:

1. 0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
2. 1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
3. 2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
4. 3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
5. 4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
6. 5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
7. 6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
8. 7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
9. 8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
10. 9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
11. 10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111

上述CSV文件中每行一条Record,按照(,)区分字段。保存在本地路径/temp/test.csv中。DataHub Topic格式如下:

字段名称 字段类型
id BIGINT
name STRING
gender BOOLEAN
salary DOUBLE
my_time TIMESTAMP

使用console工具命令如下

uf -f /temp/test.csv -p test_topic -t test_topic -m "," -n 1000
  • 下载数据
    • -f参数表示文件路径,注意:windows路径下请添加转义符,例如:D:\\test\\test.txt
    • -ti参数表示读取该时间之后的点位,格式为:yyyy-mm-dd hh:mm:ss
    • -l参数表示每次读取的数量
    • -g参数表示是否一直读
      • 0表示只读一次,即获取当前recordsize后不再消费
      • 1表示一直读取
down -p test_project -t test_topic -s shardId -d subId -f filePath -ti "1970-01-01 00:00:00" -l 100 -g 0

常见问题

  • 脚本启动失败:windows环境下运行脚本检查脚本路径是否包含括号