千家信息网

DynamoShake怎么从dynamodb迁移到mongodb

发表于:2025-11-08 作者:千家信息网编辑
千家信息网最后更新 2025年11月08日,DynamoShake怎么从dynamodb迁移到mongodb,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。DynamoShake基本
千家信息网最后更新 2025年11月08日DynamoShake怎么从dynamodb迁移到mongodb

DynamoShake怎么从dynamodb迁移到mongodb,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

DynamoShake基本功能

DynamoDB支持全量和增量的同步,进程启动后会先进行全量同步,全量同步结束后进入增量同步的阶段。
全量同步分为数据同步和索引同步两部分,数据同步用于同步数据,数据同步结束后将会进行索引的同步,索引同步会同步默认的primary key,用户自建的索引GSI如果MongoDB是副本集支持,集群版目前暂时不支持同步。
增量同步只同步数据,不同步增量同步过程中产生的索引。
此外,全量和增量同步阶段不支持对原来的库表进行DDL操作,比如删表,建表,建索引等。

断点续传

全量同步不支持断点续传功能,增量同步支持断点续传,也就是说如果增量断开了,一定时间内恢复是可以只进行增量的断点续传。但在某些情况下,比如断开的时间过久,或者之前的位点(参考下文)丢失,那么都会导致重新触发全量同步。

同步数据

所有源端的表会写入到目的的一个库(默认是dynamo-shake)的不同表中,比如用户有table1,table2,那么同步完后,目的端会有个dynamo-shake的库,库里面有table1和table2的表。
在原生的dynamodb中,协议是包裹了一层类型字段,其格式是"key: type: value"格式,例如用户插入了一条{hello: 1},那么dynamodb接口获取的数据是{"hello": {"N": 1}}的格式。
Dynamo所有的数据类型:

  • String

  • Binary

  • Number

  • StringSet

  • NumberSet

  • BinarySet

  • Map

  • List

  • Boolean

  • Null

那么我们提供2种转换方式,raw和change,其中raw就是按照裸的dynamodb接口获取的数据写入:

rszz-4.0-2:PRIMARY> use dynamo-shakeswitched to db dynamo-shakerszz-4.0-2:PRIMARY> db.zhuzhao.find(){ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd845"), "aaa" : { "L" : [ { "S" : "aa1" }, { "N" : "1234" } ] }, "hello_world" : { "S" : "f2" } }{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd847"), "aaa" : { "N" : "222" }, "qqq" : { "SS" : [ "h2", "h3" ] }, "hello_world" : { "S" : "yyyyyyyyyyy" }, "test" : { "S" : "aaa" } }{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd849"), "aaa" : { "L" : [ { "N" : "0" }, { "N" : "1" }, { "N" : "2" } ] }, "hello_world" : { "S" : "测试中文" } }

change表示剥离类型字段:

rszz-4.0-2:PRIMARY> use dynamo-shakeswitched to db dynamo-shakerszz-4.0-2:PRIMARY> db.zhuzhao.find(){ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd845"), "aaa" : [ "aa1", 1234 ] , "hello_world" : "f2" }{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd847"), "aaa" : 222, "qqq" : [ "h2", "h3" ] , "hello_world" : "yyyyyyyyyyy", "test" : "aaa" }{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd849"), "aaa" : [ 0, 1, 2 ], "hello_world" : "测试中文" }

用户可以根据自己的需求制定自己的同步类型。

位点

增量的断点续传是根据位点来实现的,默认的位点是写入到目的MongoDB中,库名是dynamo-shake-checkpoint。每个表都会记录一个checkpoint的表,同样还会有一个status_table表记录当前是全量同步还是增量同步阶段。

rszz-4.0-2:PRIMARY> use dynamo-shake42-checkpointswitched to db dynamo-shake42-checkpointrszz-4.0-2:PRIMARY> show collectionsstatus_tablezz_incr0zz_incr1rszz-4.0-2:PRIMARY>rszz-4.0-2:PRIMARY>rszz-4.0-2:PRIMARY> db.status_table.find(){ "_id" : ObjectId("5d6e0ef77e592206a8c86bfd"), "key" : "status_key", "status_value" : "incr_sync" }rszz-4.0-2:PRIMARY> db.zz_incr0.find(){ "_id" : ObjectId("5d6e0ef17e592206a8c8643a"), "shard_id" : "shardId-00000001567391596311-61ca009c", "father_id" : "shardId-00000001567375527511-6a3ba193", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }{ "_id" : ObjectId("5d6e0ef17e592206a8c8644c"), "shard_id" : "shardId-00000001567406847810-f5b6578b", "father_id" : "shardId-00000001567391596311-61ca009c", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }{ "_id" : ObjectId("5d6e0ef17e592206a8c86456"), "shard_id" : "shardId-00000001567422218995-fe7104bc", "father_id" : "shardId-00000001567406847810-f5b6578b", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }{ "_id" : ObjectId("5d6e0ef17e592206a8c86460"), "shard_id" : "shardId-00000001567438304561-d3dc6f28", "father_id" : "shardId-00000001567422218995-fe7104bc", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }{ "_id" : ObjectId("5d6e0ef17e592206a8c8646a"), "shard_id" : "shardId-00000001567452243581-ed601f96", "father_id" : "shardId-00000001567438304561-d3dc6f28", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }{ "_id" : ObjectId("5d6e0ef17e592206a8c86474"), "shard_id" : "shardId-00000001567466737539-cc721900", "father_id" : "shardId-00000001567452243581-ed601f96", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }{ "_id" : ObjectId("5d6e0ef27e592206a8c8647e"), "shard_id" : "shardId-00000001567481807517-935745a3", "father_id" : "shardId-00000001567466737539-cc721900", "seq_num" : "", "status" : "done", "worker_id" : "unknown-worker", "iterator_type" : "LATEST", "shard_it" : "arn:aws:dynamodb:us-east-2:240770237302:table/zz_incr0/stream/2019-08-27T08:23:51.043|1|AAAAAAAAAAGsTOg0+3HY+yzzD1cTzc7TPXi/iBi7sA5Q6SGSoaAJ2gz2deQu5aPRW/flYK0pG9ZUvmCfWqe1A5usMFWfVvd+yubMwWSHfV2IPVs36TaQnqpMvsywll/x7IVlCgmsjr6jStyonbuHlUYwKtUSq8t0tFvAQXtKi0zzS25fQpITy/nIb2y/FLppcbV/iZ+ae1ujgWGRoojhJ0FiYPhmbrR5ZBY2dKwEpok+QeYMfF3cEOkA4iFeuqtboUMgVqBh0zUn87iyTFRd6Xm49PwWZHDqtj/jtpdFn0CPoQPj2ilapjh9lYq/ArXMai5DUHJ7xnmtSITsyzUHakhYyIRXQqF2UWbDK3F7+Bx5d4rub1d4S2yqNUYA2eZ5CySeQz7CgvzaZT391axoqKUjjPpdUsm05zS003cDDwrzxmLnFi0/mtoJdGoO/FX9LXuvk8G3hgsDXBLSyTggRE0YM+feER8hPgjRBqbfubhdjUxR+VazwjcVO3pzt2nIkyKPStPXJZIf4cjCagTxQpC/UPMtcwWNo2gQjM2XSkWpj7DGS2E4738biV3mtKXGUXtMFVecxTL/qXy2qpLgy4dD3AG0Z7pE+eJ9qP5YRE6pxQeDlgbERg==", "update_date" : "" }{ "_id" : ObjectId("5d6e1d807e592206a8c9a102"), "shard_id" : "shardId-00000001567497561747-03819eba", "father_id" : "shardId-00000001567481807517-935745a3", "seq_num" : "39136900000000000325557205", "status" : "in processing", "worker_id" : "unknown", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "arn:aws:dynamodb:us-east-2:240770237302:table/zz_incr0/stream/2019-08-27T08:23:51.043|1|AAAAAAAAAAFw/qdbPLjsXMlPalnhh65koia44yz6A1W2uwUyu/MzRUhaaqnI0gPM8ebVgy7dW7dDWLTh/WXYyDNNyXR3Hvk01IfEKDf+FSLMNvh3iELdrO5tRoLtZI2fxxrPZvudRc3KShX0Pvqy2YYwl4nlBR6QezHTWx5H2AU22MGPTx8aMRbjUgPwvgEExRgdzfhG6G9gkc7C71fIc98azwpSm/IW+mV/h/doFndme47k2v8g0GNJvgLSoET7HdJYH3XFdqh5QVDIP4sbz8X1cpN3y8AlT7Muk2/yXOdNeTL6tApuonCrUpJME9/qyBYQVI5dsAHnAWaP2Te3EAvz3ao7oNdnA8O6uz5VF9zFdN1OUHWM40kLUsX4sHve7McEzFLgf4NL1WTAnPN13cFhEm9BS8M7tiJqZ0OzgkbF1AWfq+xg/O6c57/Vvx/G/75DZ8XcWIABgGNkWBET/vLDrgjJQ0PUZJZKNmmbgKKTyHgSl4YOXNEeyH7l6atuc2WaREDjbf7lnQO5No11sz4g3O+AreBcpGVhdZNhGGcrG/wduPYEZfg2hG1sfYiSAM8GipUPMA0PM7JPIJmqCaY90JxRcI1By24tpp9Th45/5rLTGPYJZA==", "update_date" : "" }

其中status_table表中"status_value" : "incr_sync"表示进入了增量阶段。增量的每个shard都会记录一个checkpoint,关于具体shard分裂的规则可以参考dynamodb的guan'fa官方文档。下面是增量表checkpoint的各个字段的说明:

  • _id:mongodb自带主键id

  • shard_id:shard的id,每个shard有一个唯一的id

  • father_id:父shard的id,shard可能有一个父shard。

  • seq_num: 目前处理到的shard内部的sequence number,这个是主要的位点信息。

  • status: 目前同步的阶段,一共有以下几个状态:

    • "not process": 未处理

    • "no need to process": 没有必要处理

    • "prepare stage":准备处理

    • "in processing": 处理中

    • "wait father finish":等待父节点处理完毕再进行处理

    • "done": 处理完毕

  • worker_id:处理的worker id,目前暂未启用

  • iterator_type:shard的遍历方式

  • shard_it:shard的迭代器地址,次要位点信息。

  • update_date:checkpoint更新的时间戳

索引

根据默认的primary key创建一个唯一索引,并且根据partition key创建shard key。用户自己的索引gsi目前不进行创建。

DynamoShake内部架构

本小节主要介绍DynamoShake的部分架构细节

全量同步

下图是基本的一个table的数据同步架构图(dynamo-shake会启动多个并发线程tableSyncer进行拉取,用户可控并发度),fetcher线程从源端dynamodb拉取数据后将数据推入队列,紧接着parser线程从队列中拿取数据并进行解析(dynamo协议转bson),executor负责聚合部分数据并写入mongodb。

  • fetcher。目前fetcher线程只有1个,用的是协议转换驱动是aws提供的driver。fetcher的原理是调用driver进行批量抓取源库的数据,抓到了就塞入队列中,直到抓完当前table的所有数据。fetcher单独分离出来主要是出于网络IO考虑的,目前拉取受网络影响,会比较慢。

  • parser。parser可以启动多个,默认目前是2个,用户可以通过FullDocumentParser进行控制。其主要就是从队列中读取数据,并解析成bson结构。parser解析后,数据按条写入executor的队列。parser线程单独独立出来主要是出于解析比较耗CPU资源考虑。

  • executor。executor也可以启动多个,默认目前是4个,用户可以通过FullDocumentConcurrency进行控制。executor从队列中拉取,并进行batch聚合(聚合上限16MB,总条数1024)后写入目的mongodb。
    当前所有表的数据写入完后,tableSyncer将会退出。

增量同步

增量整体架构如下:


Fetcher线程负责感知stream中shard的变化,Manager负责进行消息的通知,或者创建新的Dispatcher进行消息的处理,一个shard对应一个Dispatcher。Dispatcher从源端拉取增量数据,并通过Batcher进行数据解析和打包整合,然后通过executor进行写入到MongoDB,同时会更新checkpoint。另外,如果是断点续传,那么Dispatcher会从旧的checkpoint位点开始拉取,而不是从头开始拉。

DynamoShake的使用

启动:./dynamo-shake -conf=dynamo-shake.conf,配置参数在dynamo-shake.conf中指定,以下是各个参数的意义:

  • id: 修改会影响MongoDB上目的库的名字

  • log.file:日志文件,不配置将打印到标准输出

  • log.level: log级别。推荐默认。

  • log.buffer: 打印是否带缓存。推荐默认。

  • system_profile:打印内部堆栈的端口号。推荐默认。

  • http_profile:暂未启用

  • sync_mode:同步模式,all表示全量+增量,full表示仅全量,incr表示仅增量(目前不支持)

  • source.access_key_id: dynamodb连接配置参数

  • source.secret_access_key: dynamodb连接配置参数

  • source.session_token: dynamodb连接配置参数,没有可以留空

  • source.region: dynamodb连接配置参数

  • filter.collection.white:过滤白名单,只同步指定的表

  • filter.collection.black:过滤黑名单,不通过指定的表。

  • qps.full:全量阶段限速,1秒钟发送多少个请求

  • qps.full.batch_num:全量阶段限速,1个请求最多包括多少个item。

  • qps.incr:增量阶段限速,1秒钟发送多少个请求

  • qps.incr.batch_num:增量阶段限速,1个请求最多包括多少个item。

  • target.type:目的端配置,目前仅支持mongodb

  • target.address: 目的端mongodb的连接串地址。

  • target.mongodb.type: mongodb是replica还是sharding

  • target.mongodb.exist:如果目的库同名表存在,执行什么行为。drop表示删除,rename表示重命名,留空表示不处理。

  • full.concurrency:全量同步的线程个数,1个线程对应1个表

  • full.document.concurrency:全量同步1个表内的并发个数。

  • full.document.parser:1个表内parser线程的个数

  • full.enable_index.primary:是否同步dynamodb的primary key。

  • full.enable_index.user:是否同步用户自建的索引,目前不支持

  • convert.type:写入的模式,raw表示裸写入, change表示解析类型字段后写入,参考上述文档。

  • increase.concurrency:增量同步并发参数,1次最多抓取的shard个数

  • checkpoint.address = checkpont的存储地址,默认不配置与目的库一致。

  • checkpoint.db = checkpoint写入的db的名字,默认是$db-checkpoint。

DynamoFullCheck

DynamoFullCheck是一个用于校验DynamoDB和MongoDB数据是否一致的工具,目前仅支持全量校验,不支持增量,也就是说,如果增量同步阶段,那么源和目的是不一致的。
DynamoFullCheck只支持单向校验,也就是校验DynamoDB的数据是否是MongoDB的子集,反向不进行校验。
另外,还支持抽样校验,支持只校验感兴趣的表。
校验主要分为以下几部分:

  • 轮廓校验。首先,校验两边的表中数目是否一致;接着,校验索引是否一致(目前没做索引校验)。注意,如果表中数目不一致,将会直接退出,不会进行后续的校验。

  • 精确校验。精确校验数据,原理是从源端拉取数据并解析,如果有唯一索引,那么根据唯一索引查找MongoDB的doc,并对比一致性;如果没有唯一索引,那么会根据整个doc在MongoDB中进行查找(比较重)。
    抽样原理:

精确校验的时候,如果启用抽样,那么会对每个doc进行抽样,判断当前doc是否需要抽样。原理比较简单,比如按30%抽样,那么再0~100中产生一个随机数,如果是0~30的就校验,反之不校验。
DynamoFullCheck由于从源DynamoDB拉取也需要经过fetch,parse阶段,所以一定程度上,该部分代码复用了DynamoShake,不同的是DynamoFullCheck内部各个fetcher, parser, executor线程并发度都是1。

使用参数

full-check参数稍微简单点,直接用的命令行注入,例如:./dynamo-full-check --sourceAccessKeyID=BUIASOISUJPYS5OP3P5Q --sourceSecretAccessKey=TwWV9reJCrZhHKSYfqtTaFHW0qRPvjXb3m8TYHMe --sourceRegion=ap-east-1 -t="10.1.1.1:30441" --sample=300

Usage:  dynamo-full-check.darwin [OPTIONS]Application Options:  -i, --id=                    target database collection name (default: dynamo-shake)  -l, --logLevel=  -s, --sourceAccessKeyID=     dynamodb source access key id      --sourceSecretAccessKey= dynamodb source secret access key      --sourceSessionToken=    dynamodb source session token      --sourceRegion=          dynamodb source region      --qpsFull=               qps of scan command, default is 10000      --qpsFullBatchNum=       batch number in each scan command, default is 128  -t, --targetAddress=         mongodb target address  -d, --diffOutputFile=        diff output file name (default: dynamo-full-check-diff)  -p, --parallel=              how many threads used to compare, default is 16 (default: 16)  -e, --sample=                comparison sample number for each table, 0 means disable (default: 1000)      --filterCollectionWhite= only compare the given tables, split by ';'      --filterCollectionBlack= do not compare the given tables, split by ';'  -c, --convertType=           convert type (default: raw)  -v, --version                print versionHelp Options:  -h, --help                   Show this help message

看完上述内容,你们掌握DynamoShake怎么从dynamodb迁移到mongodb的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

同步 数据 增量 索引 支持 阶段 处理 目的 线程 参数 用户 配置 一致 位点 断点 队列 抽样 类型 部分 个数 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 公安网络安全监管工作谋划 云端储存服务器 查询数据库中科学计数法字段 模电数电对软件开发有影响吗 齐齐哈尔建华区网络安全宣传周 用友t3还用服务器吗 外服吃鸡为什么会服务器繁忙 北京软件开发如何收费 寒假护苗网络安全小结 网络技术专员招聘条件 开展网络安全检查活动 国家网络安全法感想 网络安全防护技术视频 为什么量子特攻无法连接服务器 在北京做软件开发辛苦还是在深圳 秋田犬数据库 进口串口设备服务器报价 网络安全意识ppt免费 计算机软件开发是干嘛的 数据库毕业有前途吗 福田管理软件开发找哪家公司 松江区即时网络技术五星服务 安装数据库一直显示错误 安全通防攻击服务器 双梦是哪个服务器贴吧 筑牢网络安全屏障织密 202年国家网络安全宣传周 微软亚洲研究院软件开发面试 服务器集群怎么实现冗余 软件开发转销售成功案例
0