Java源码示例:com.alibaba.otter.canal.protocol.CanalEntry.RowData
示例1
public static <T extends Object> PairWrapper<String, Object> convertProtobufRecord(EntryHeader header, RowData rowData) {
PairWrapper<String, Object> wrapper = new PairWrapper<>();
List<Column> Columns = null;
if (header.isInsert() || header.isUpdate()) {
Columns = rowData.getAfterColumnsList();
} else if (header.isDelete()) {
Columns = rowData.getBeforeColumnsList();
}
wrapper.addProperties(Constants.MessageBodyKey.POS, header.getPos());
wrapper.addProperties(Constants.MessageBodyKey.OP_TS, header.getTsTime());
Map<String, Object> map = convert2map(Columns);
for (Map.Entry<String, Object> entry : map.entrySet()) {
wrapper.addPair(new Pair<>(entry.getKey(), CharSequence.class.isInstance(entry.getValue()) ? entry.getValue().toString() : entry.getValue()));
}
return wrapper;
}
示例2
/**
* 创建DBusMessageBuilder对象,同时生成ums schema
*/
private DbusMessageBuilder createBuilderWithSchema(MetaVersion version, MessageEntry msgEntry) {
DbusMessageBuilder builder = new DbusMessageBuilder();
String namespace;
if (outputTablePartition()) {
namespace = builder.buildNameSpace(Utils.getDataSourceNamespace(),
msgEntry.getEntryHeader().getSchemaName(), msgEntry.getEntryHeader().getTableName(),
version.getVersion(), msgEntry.getEntryHeader().getPartitionTableName());
} else {
namespace = builder.buildNameSpace(Utils.getDataSourceNamespace(),
msgEntry.getEntryHeader().getSchemaName(), msgEntry.getEntryHeader().getTableName(),
version.getVersion());
}
builder.build(DbusMessage.ProtocolType.DATA_INCREMENT_DATA, namespace, table.getBatchId());
EventType eventType = msgEntry.getEntryHeader().getOperType();
RowData rowData = msgEntry.getMsgColumn().getRowDataLst().get(0);
List<Column> columns = Support.getFinalColumns(eventType, rowData);
for (Column column : columns) {
String colType = Support.getColumnType(column);
// 这里避免,源端表中包含ums_id_/ums_ts_等字段
if (Support.isSupported(colType) && !builder.getMessage().containsFiled(column.getName())) {
builder.appendSchema(column.getName(), DataType.convertMysqlDataType(colType), true);
}
}
return builder;
}
示例3
public static <T extends Object> PairWrapper<String, Object> convertProtobufRecordBeforeUpdate(EntryHeader header, RowData rowData) {
PairWrapper<String, Object> wrapper = new PairWrapper<>();
List<Column> Columns = rowData.getBeforeColumnsList();
Map<String, Object> map = convert2map(Columns);
for (Map.Entry<String, Object> entry : map.entrySet()) {
wrapper.addPair(new Pair<>(entry.getKey(), CharSequence.class.isInstance(entry.getValue()) ? entry.getValue().toString() : entry.getValue()));
}
return wrapper;
}
示例4
public static List<Column> getFinalColumns(EventType type, RowData rowData) {
List<Column> Columns = null;
if (type == EventType.INSERT || type == EventType.UPDATE) {
Columns = rowData.getAfterColumnsList();
} else if (type == EventType.DELETE) {
Columns = rowData.getBeforeColumnsList();
}
return Columns;
}
示例5
@Override
protected void doSync(String database, String table, String index, String type, RowData rowData) {
List<Column> columns = rowData.getAfterColumnsList();
String primaryKey = Optional.ofNullable(mappingService.getTablePrimaryKeyMap().get(database + "." + table)).orElse("id");
Column idColumn = columns.stream().filter(column -> column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null);
if (idColumn == null || StringUtils.isBlank(idColumn.getValue())) {
logger.warn("update_column_find_null_warn update从column中找不到主键,database=" + database + ",table=" + table);
return;
}
logger.debug("update_column_id_info update主键id,database=" + database + ",table=" + table + ",id=" + idColumn.getValue());
Map<String, Object> dataMap = parseColumnsToMap(columns);
elasticsearchService.update(index, type, idColumn.getValue(), dataMap);
logger.debug("update_es_info 同步es插入操作成功!database=" + database + ",table=" + table + ",data=" + dataMap);
}
示例6
@Override
protected void doSync(String database, String table, String index, String type, RowData rowData) {
List<Column> columns = rowData.getAfterColumnsList();
String primaryKey = Optional.ofNullable(mappingService.getTablePrimaryKeyMap().get(database + "." + table)).orElse("id");
Column idColumn = columns.stream().filter(column -> column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null);
if (idColumn == null || StringUtils.isBlank(idColumn.getValue())) {
logger.warn("insert_column_find_null_warn insert从column中找不到主键,database=" + database + ",table=" + table);
return;
}
logger.debug("insert_column_id_info insert主键id,database=" + database + ",table=" + table + ",id=" + idColumn.getValue());
Map<String, Object> dataMap = parseColumnsToMap(columns);
elasticsearchService.insertById(index, type, idColumn.getValue(), dataMap);
logger.debug("insert_es_info 同步es插入操作成功!database=" + database + ",table=" + table + ",data=" + JsonUtil.toJson(dataMap));
}
示例7
@Override
protected void doSync(String database, String table, String index, String type, RowData rowData) {
List<Column> columns = rowData.getBeforeColumnsList();
String primaryKey = Optional.ofNullable(mappingService.getTablePrimaryKeyMap().get(database + "." + table)).orElse("id");
Column idColumn = columns.stream().filter(column -> column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null);
if (idColumn == null || StringUtils.isBlank(idColumn.getValue())) {
logger.warn("insert_column_find_null_warn insert从column中找不到主键,database=" + database + ",table=" + table);
return;
}
logger.debug("insert_column_id_info insert主键id,database=" + database + ",table=" + table + ",id=" + idColumn.getValue());
elasticsearchService.deleteById(index, type, idColumn.getValue());
logger.debug("insert_es_info 同步es插入操作成功!database=" + database + ",table=" + table + ",id=" + idColumn.getValue());
}
示例8
private int checkLoopback(RowData rowData) {
return 0;//TODO 回环数据判断,支持双向同步时进行改造
}
示例9
private List<RdbEventRecord> internParse(Entry entry) {
RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new DatalinkException("parser of mysql-event has an error , data:" + entry.toString(), e);
}
if (rowChange == null) {
return null;
}
EventType eventType = EventType.valueOf(rowChange.getEventType().name());
if (eventType.isQuery()) {
return null;
}
if (eventType.isDdl()) {
logger.info("find a ddl event in MessageParser , ddl sql is {}.", rowChange.getSql());
MediaService mediaService = DataLinkFactory.getObject(MediaService.class);
MediaSourceInfo mediaSourceInfo = mediaService.getMediaSourceById(readerParameter.getMediaSourceId());
DbDialect dbDialect = DbDialectFactory.getDbDialect(mediaSourceInfo);
dbDialect.reloadTable(entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
if (readerParameter.isDdlSync()) {
logger.info("The value of the ddl-sync is true , prepare to parse ddl event.");
RdbEventRecord record = new RdbEventRecord();
record.setSchemaName(entry.getHeader().getSchemaName());
record.setTableName(entry.getHeader().getTableName());
record.setEventType(eventType);
record.setExecuteTime(entry.getHeader().getExecuteTime());
record.setSql(rowChange.getSql());
record.setDdlSchemaName(rowChange.getDdlSchemaName());
return Arrays.asList(record);
} else {
logger.info("The value of the ddl-sync is false , the ddl event is ignored.");
return null;
}
}
List<RdbEventRecord> eventDatas = new ArrayList<>();
for (RowData rowData : rowChange.getRowDatasList()) {
RdbEventRecord eventData = internParse(entry, rowChange, rowData);
if (eventData != null) {
eventDatas.add(eventData);
}
}
return eventDatas;
}
示例10
public List<RowData> getRowDataLst() {
return rowDataLst;
}
示例11
public void setRowDataLst(List<RowData> rowDataLst) {
this.rowDataLst = rowDataLst;
}
示例12
protected abstract void doSync(String database, String table, String index, String type, RowData rowData);