時間:2024-02-04 13:45作者:下載吧人氣:26
我們開源了一個訂閱分發mysql的binlog的項目,一直用的非常好,忽然有天開發說能不能支持MongoDB的數據訂閱呢,MongoDB的使用度也挺廣泛的。安排。經過簡單的了解后發現MongoDB也有類似binlog的機制,最終花了兩天時間把功能完成,并統一抽象集成到binlog開源項目中,使用和binlog同一套訂閱分發模型管理MongoDB數據源。整個過程非常順利,比整mysql的binlog要簡單的多了。
先來聊聊MongoDB的主備機制,和mysql的binlog類似,在MongoDB中,有一個系統庫“”Local”,庫里有一個集合“oplog.rs”,這個集合類似于binlog文件,里面記錄了MongoDB的所有操作。從節點通過讀取oplog.rs里的數據做到數據同步。
和訂閱mysql的binlog一樣(模擬一個從節點mysql)。我們的訂閱服務要像從節點那樣讀取解析oplog.rs里的數據。解析前先看下oplog.rs的Document的數據結構
上圖是一個插入的數據的日志,可見oplog的doc中共有如下字段,含義分別如下:
ts
:操作的時間戳(非常重要)
t
:term最初在主數據庫上生成操作的。(含義不明)
h
:本次操作的唯一hashID
v
: 版本號
op
:操作類型,有六種類型,我們只需要關注其中的i(插入)、u(更新)、d(刪除)即可
ns
:庫名和集合名稱,中間使用“.”連接
o
:本次操作的document內容
o2
:只有op操作類型時u更新時,才會有這個字段,代表更新的條件語句
$set
:o2獲取后的文檔里的屬性,代表更新的字段
如上字段,完成一次oplog的解析,只需要ts、op、ns、o、o2、$set即可,其中ts非常重要,可以類比為binlog中的Position。同步mysql的數據時,通過記錄消費binlog的位置,也就是Position,可以有效避免訂閱服務停機后,消費記錄丟失的問題。同步MongoDB時,通過記錄ts的值,來記錄消費的位置,可以到達和訂閱binlog一樣的效果。和mysql訂閱不同的是,MongoDB的同步需要同步服務自己查詢,而且oplog在MongoDB4.0之前的版本有大小限制,超過設置的容量后,老的數據就會被丟失,在4.0之后的版本已經解除了這個限制。
上面已經分析了oplog的結構以及訂閱步驟,下面我們直接構建查詢即可,需要注意,每次獲取到的ts值,需要存儲記錄下來,已便重新訂閱時,從上次斷開的記錄重新開始。下面直接看代碼,重點邏輯都以注釋詳盡
private BsonTimestamp queryTs; //如果是首次訂閱,需要使用自然排序查詢,獲取第最后一次操作的操作時間戳。如果是續訂閱直接讀取記錄的值賦值給queryTs即可
FindIterabletsCursor = collection.find().sort(new BasicDBObject(“$natural”, -1))
.limit(1);
Document tsDoc = tsCursor.first();
queryTs = (BsonTimestamp) tsDoc.get(“ts”);
while (true) try {
//構建查詢語句,查詢大于當前查詢時間戳queryTs的記錄
BasicDBObject query = new BasicDBObject(“ts”, new BasicDBObject(“$gt”, queryTs));
MongoCursordocCursor = collection.find(query)
.cursorType(CursorType.TailableAwait) //沒有數據時阻塞休眠
.noCursorTimeout(true) //防止服務器在不活動時間(10分鐘)后使空閑的游標超時。
.oplogReplay(true) //結合query條件,獲取增量數據,這個參數比較難懂,見:https://docs.mongodb.com/manual/reference/command/find/index.html
.maxAwaitTime(1, TimeUnit.SECONDS) //設置此操作在服務器上的最大等待執行時間
.iterator();
while (docCursor.hasNext()) {
Document document = docCursor.next();
//更新查詢時間戳
queryTs = (BsonTimestamp) document.get(“ts”);
//TODO 在這里接收到數據后通過訂閱數據路由分發
String op = document.getString(“op”);
String database = document.getString(“ns”);
Document context = (Document) document.get(“o”);
Document where = null;
if (op.equals(“u”)) {
where = (Document) document.get(“o2”);
if (context != null) {
context = (Document) context.get(“$set”);
}
}
System.err.println(“操作時間戳:” + queryTs.getTime());
System.err.println(“操作類 型:” + op);
System.err.println(“數據庫.集合:” + database);
System.err.println(“更新條件:” + JSON.toJSONString(where));
System.err.println(“文檔內容:” + JSON.toJSONString(context));
}
} catch (Exception e) { e.printStackTrace(); }
}
網友評論