Java源码示例:com.alibaba.otter.canal.protocol.CanalEntry.RowData

示例1
public static <T extends Object> PairWrapper<String, Object> convertProtobufRecord(EntryHeader header, RowData rowData) {
    PairWrapper<String, Object> wrapper = new PairWrapper<>();
    List<Column> Columns = null;
    if (header.isInsert() || header.isUpdate()) {
        Columns = rowData.getAfterColumnsList();
    } else if (header.isDelete()) {
        Columns = rowData.getBeforeColumnsList();
    }
    wrapper.addProperties(Constants.MessageBodyKey.POS, header.getPos());
    wrapper.addProperties(Constants.MessageBodyKey.OP_TS, header.getTsTime());

    Map<String, Object> map = convert2map(Columns);
    for (Map.Entry<String, Object> entry : map.entrySet()) {
        wrapper.addPair(new Pair<>(entry.getKey(), CharSequence.class.isInstance(entry.getValue()) ? entry.getValue().toString() : entry.getValue()));
    }

    return wrapper;
}
 
示例2
/**
 * 创建DBusMessageBuilder对象,同时生成ums schema
 */
private DbusMessageBuilder createBuilderWithSchema(MetaVersion version, MessageEntry msgEntry) {
    DbusMessageBuilder builder = new DbusMessageBuilder();
    String namespace;
    if (outputTablePartition()) {
        namespace = builder.buildNameSpace(Utils.getDataSourceNamespace(),
                msgEntry.getEntryHeader().getSchemaName(), msgEntry.getEntryHeader().getTableName(),
                version.getVersion(), msgEntry.getEntryHeader().getPartitionTableName());
    } else {
        namespace = builder.buildNameSpace(Utils.getDataSourceNamespace(),
                msgEntry.getEntryHeader().getSchemaName(), msgEntry.getEntryHeader().getTableName(),
                version.getVersion());
    }
    builder.build(DbusMessage.ProtocolType.DATA_INCREMENT_DATA, namespace, table.getBatchId());
    EventType eventType = msgEntry.getEntryHeader().getOperType();
    RowData rowData = msgEntry.getMsgColumn().getRowDataLst().get(0);
    List<Column> columns = Support.getFinalColumns(eventType, rowData);
    for (Column column : columns) {
        String colType = Support.getColumnType(column);
        // 这里避免,源端表中包含ums_id_/ums_ts_等字段
        if (Support.isSupported(colType) && !builder.getMessage().containsFiled(column.getName())) {
            builder.appendSchema(column.getName(), DataType.convertMysqlDataType(colType), true);
        }
    }
    return builder;
}
 
示例3
public static <T extends Object> PairWrapper<String, Object> convertProtobufRecordBeforeUpdate(EntryHeader header, RowData rowData) {
    PairWrapper<String, Object> wrapper = new PairWrapper<>();
    List<Column> Columns = rowData.getBeforeColumnsList();
    Map<String, Object> map = convert2map(Columns);
    for (Map.Entry<String, Object> entry : map.entrySet()) {
        wrapper.addPair(new Pair<>(entry.getKey(), CharSequence.class.isInstance(entry.getValue()) ? entry.getValue().toString() : entry.getValue()));
    }

    return wrapper;
}
 
示例4
public static List<Column> getFinalColumns(EventType type, RowData rowData) {
    List<Column> Columns = null;
    if (type == EventType.INSERT || type == EventType.UPDATE) {
        Columns = rowData.getAfterColumnsList();
    } else if (type == EventType.DELETE) {
        Columns = rowData.getBeforeColumnsList();
    }
    return Columns;
}
 
示例5
@Override
protected void doSync(String database, String table, String index, String type, RowData rowData) {
    List<Column> columns = rowData.getAfterColumnsList();
    String primaryKey = Optional.ofNullable(mappingService.getTablePrimaryKeyMap().get(database + "." + table)).orElse("id");
    Column idColumn = columns.stream().filter(column -> column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null);
    if (idColumn == null || StringUtils.isBlank(idColumn.getValue())) {
        logger.warn("update_column_find_null_warn update从column中找不到主键,database=" + database + ",table=" + table);
        return;
    }
    logger.debug("update_column_id_info update主键id,database=" + database + ",table=" + table + ",id=" + idColumn.getValue());
    Map<String, Object> dataMap = parseColumnsToMap(columns);
    elasticsearchService.update(index, type, idColumn.getValue(), dataMap);
    logger.debug("update_es_info 同步es插入操作成功!database=" + database + ",table=" + table + ",data=" + dataMap);
}
 
示例6
@Override
protected void doSync(String database, String table, String index, String type, RowData rowData) {
    List<Column> columns = rowData.getAfterColumnsList();
    String primaryKey = Optional.ofNullable(mappingService.getTablePrimaryKeyMap().get(database + "." + table)).orElse("id");
    Column idColumn = columns.stream().filter(column -> column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null);
    if (idColumn == null || StringUtils.isBlank(idColumn.getValue())) {
        logger.warn("insert_column_find_null_warn insert从column中找不到主键,database=" + database + ",table=" + table);
        return;
    }
    logger.debug("insert_column_id_info insert主键id,database=" + database + ",table=" + table + ",id=" + idColumn.getValue());
    Map<String, Object> dataMap = parseColumnsToMap(columns);
    elasticsearchService.insertById(index, type, idColumn.getValue(), dataMap);
    logger.debug("insert_es_info 同步es插入操作成功!database=" + database + ",table=" + table + ",data=" + JsonUtil.toJson(dataMap));
}
 
示例7
@Override
protected void doSync(String database, String table, String index, String type, RowData rowData) {
    List<Column> columns = rowData.getBeforeColumnsList();
    String primaryKey = Optional.ofNullable(mappingService.getTablePrimaryKeyMap().get(database + "." + table)).orElse("id");
    Column idColumn = columns.stream().filter(column -> column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null);
    if (idColumn == null || StringUtils.isBlank(idColumn.getValue())) {
        logger.warn("insert_column_find_null_warn insert从column中找不到主键,database=" + database + ",table=" + table);
        return;
    }
    logger.debug("insert_column_id_info insert主键id,database=" + database + ",table=" + table + ",id=" + idColumn.getValue());
    elasticsearchService.deleteById(index, type, idColumn.getValue());
    logger.debug("insert_es_info 同步es插入操作成功!database=" + database + ",table=" + table + ",id=" + idColumn.getValue());
}
 
示例8
private int checkLoopback(RowData rowData) {
    return 0;//TODO 回环数据判断,支持双向同步时进行改造
}
 
示例9
private List<RdbEventRecord> internParse(Entry entry) {
    RowChange rowChange;
    try {
        rowChange = RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
        throw new DatalinkException("parser of mysql-event has an error , data:" + entry.toString(), e);
    }

    if (rowChange == null) {
        return null;
    }

    EventType eventType = EventType.valueOf(rowChange.getEventType().name());
    if (eventType.isQuery()) {
        return null;
    }

    if (eventType.isDdl()) {
        logger.info("find a ddl event in MessageParser , ddl sql is {}.", rowChange.getSql());

        MediaService mediaService = DataLinkFactory.getObject(MediaService.class);
        MediaSourceInfo mediaSourceInfo = mediaService.getMediaSourceById(readerParameter.getMediaSourceId());

        DbDialect dbDialect = DbDialectFactory.getDbDialect(mediaSourceInfo);
        dbDialect.reloadTable(entry.getHeader().getSchemaName(), entry.getHeader().getTableName());

        if (readerParameter.isDdlSync()) {
            logger.info("The value of the ddl-sync is true , prepare to parse ddl event.");

            RdbEventRecord record = new RdbEventRecord();
            record.setSchemaName(entry.getHeader().getSchemaName());
            record.setTableName(entry.getHeader().getTableName());
            record.setEventType(eventType);
            record.setExecuteTime(entry.getHeader().getExecuteTime());
            record.setSql(rowChange.getSql());
            record.setDdlSchemaName(rowChange.getDdlSchemaName());
            return Arrays.asList(record);
        } else {
            logger.info("The value of the ddl-sync is false , the ddl event is ignored.");
            return null;
        }
    }


    List<RdbEventRecord> eventDatas = new ArrayList<>();
    for (RowData rowData : rowChange.getRowDatasList()) {
        RdbEventRecord eventData = internParse(entry, rowChange, rowData);
        if (eventData != null) {
            eventDatas.add(eventData);
        }
    }

    return eventDatas;
}
 
示例10
public List<RowData> getRowDataLst() {
    return rowDataLst;
}
 
示例11
public void setRowDataLst(List<RowData> rowDataLst) {
    this.rowDataLst = rowDataLst;
}
 
示例12
protected abstract void doSync(String database, String table, String index, String type, RowData rowData);