Java源码示例:com.github.shyiko.mysql.binlog.event.QueryEventData
示例1
@Test
public void testQueryEvent() {
String sql = "CREATE UNIQUE INDEX unique_index ON `my_db`.`my_table` (`col1`, `col2`)";
eventHeader.setEventType(EventType.QUERY);
QueryEventData eventData = new QueryEventData();
eventData.setDatabase(DATABASE);
eventData.setSql(sql);
Optional<BinlogEvent> binlogEvent =
BinaryLogConnectorEventMapper.INSTANCE.map(
new Event(eventHeader, eventData), BINLOG_FILE_POS);
assertTrue(binlogEvent.isPresent());
assertTrue(binlogEvent.get() instanceof QueryEvent);
QueryEvent queryEvent = (QueryEvent) (binlogEvent.get());
assertEquals(BINLOG_FILE_POS, queryEvent.getBinlogFilePos());
assertEquals(DATABASE, queryEvent.getDatabase());
assertEquals(SERVER_ID, queryEvent.getServerId());
assertEquals(TIMESTAMP, queryEvent.getTimestamp());
assertEquals(sql, queryEvent.getSql());
}
示例2
private void processQueryEvent(Event event) {
QueryEventData data = event.getData();
String sql = data.getSql();
if (createTablePattern.matcher(sql).find()) {
schema.reset();
}
}
示例3
@Override
public void handle(Event event) {
boolean ddleventEnable = Boolean.parseBoolean(System.getProperty(DDLEVENT_ENABLE_KEY,DDLEVENT_ENABLE));
if(ddleventEnable){
QueryEventData data = event.getData();
String sql = data.getSql();
if (sql.contains(SUBSCRIBE_SQL_EVENT)) {
log.info("数据库:{}发生alter table事件", data.getDatabase());
String topic = System.getProperty(DEFAULT_TOPIC_KEY,DEFAULT_TOPIC);
dataPublisher.pushToKafka(topic, data);
}
}
updateBinaryLogStatus(event.getHeader());
}
示例4
@Override
public void onEvent(Event event) {
LOG.trace("Received event {}", event);
EventType eventType = event.getHeader().getEventType();
currentBinLogFileName = client.getBinlogFilename();
switch (eventType) {
case TABLE_MAP:
handleTableMappingEvent((TableMapEventData) event.getData());
break;
case PRE_GA_WRITE_ROWS:
case WRITE_ROWS:
case EXT_WRITE_ROWS:
handleRowEvent(event, event.<WriteRowsEventData>getData().getTableId());
break;
case PRE_GA_UPDATE_ROWS:
case UPDATE_ROWS:
case EXT_UPDATE_ROWS:
handleRowEvent(event, event.<UpdateRowsEventData>getData().getTableId());
break;
case PRE_GA_DELETE_ROWS:
case DELETE_ROWS:
case EXT_DELETE_ROWS:
handleRowEvent(event, event.<DeleteRowsEventData>getData().getTableId());
break;
case QUERY:
QueryEventData queryEventData = event.getData();
String query = queryEventData.getSql();
if (isCommit(query)) {
finishTx();
} else if (isSchemaChangeQuery(query)) {
schemaRepository.evictAll();
}
break;
case XID:
finishTx();
break;
case GTID:
GtidEventData eventData = event.getData();
currentGtidSet = client.getGtidSet();
currentTxGtid = eventData.getGtid();
currentTxEventSeqNo = 0;
LOG.trace("Started new tx, gtid: {}", currentTxGtid);
break;
default:
// ignore
break;
}
}
示例5
@Override
public void onEvent(Event event) {
LOG.trace("Received event {}", event);
EventType eventType = event.getHeader().getEventType();
currentBinLogFileName = client.getBinlogFilename();
switch (eventType) {
case TABLE_MAP:
handleTableMappingEvent((TableMapEventData) event.getData());
break;
case PRE_GA_WRITE_ROWS:
case WRITE_ROWS:
case EXT_WRITE_ROWS:
handleRowEvent(event, event.<WriteRowsEventData>getData().getTableId());
break;
case PRE_GA_UPDATE_ROWS:
case UPDATE_ROWS:
case EXT_UPDATE_ROWS:
handleRowEvent(event, event.<UpdateRowsEventData>getData().getTableId());
break;
case PRE_GA_DELETE_ROWS:
case DELETE_ROWS:
case EXT_DELETE_ROWS:
handleRowEvent(event, event.<DeleteRowsEventData>getData().getTableId());
break;
case QUERY:
QueryEventData queryEventData = event.getData();
String query = queryEventData.getSql();
if (isCommit(query)) {
finishTx();
} else if (isSchemaChangeQuery(query)) {
schemaRepository.evictAll();
}
break;
case XID:
finishTx();
break;
case GTID:
GtidEventData eventData = event.getData();
currentGtidSet = client.getGtidSet();
currentTxGtid = eventData.getGtid();
currentTxEventSeqNo = 0;
LOG.trace("Started new tx, gtid: {}", currentTxGtid);
break;
default:
// ignore
break;
}
}