Java源码示例:com.alibaba.otter.canal.store.model.Event
示例1
protected boolean isPermit(Event event, long state) {
if (txState.intValue() == 1 && inTransaction.get()) { // 如果处于事务中,直接允许通过。因为事务头已经做过判断
return true;
} else if (txState.intValue() == 0) {
boolean result = super.isPermit(event, state);
if (result) {
// 可能第一条送过来的数据不为Begin,需要做判断处理,如果非事务,允许直接通过,比如DDL语句
if (isTransactionBegin(event)) {
if (txState.compareAndSet(0, 1)) {
inTransaction.set(true);
return true; // 事务允许通过
}
} else if (txState.compareAndSet(0, 2)) { // 非事务保护中
// 当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
return true; // DDL/DCL/TransactionEnd允许通过
}
}
}
return false;
}
示例2
@Test
public void testOnePutOneGet() {
MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.setBatchMode(BatchMode.MEMSIZE);
eventStore.start();
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L));
Assert.assertTrue(result);
Position position = eventStore.getFirstPosition();
Events<Event> entrys = eventStore.tryGet(position, 1);
Assert.assertTrue(entrys.getEvents().size() == 1);
Assert.assertEquals(position, entrys.getPositionRange().getStart());
Assert.assertEquals(position, entrys.getPositionRange().getEnd());
eventStore.stop();
}
示例3
@Test
public void testOnePutOneGet() {
MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.start();
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L));
Assert.assertTrue(result);
Position position = eventStore.getFirstPosition();
Events<Event> entrys = eventStore.tryGet(position, 1);
Assert.assertTrue(entrys.getEvents().size() == 1);
Assert.assertEquals(position, entrys.getPositionRange().getStart());
Assert.assertEquals(position, entrys.getPositionRange().getEnd());
eventStore.stop();
}
示例4
public boolean tryPut(List<Event> datas) throws CanalStoreException {
System.out.println("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
for (Event data : datas) {
Date date = new Date(data.getExecuteTime());
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
// System.out.println(MessageFormat.format(messgae, new Object[]
// { Thread.currentThread().getName(),
// header.getLogfilename(), header.getLogfileoffset(),
// format.format(date),
// data.getEntry().getEntryType(), "" }));
System.out.println(data.getEntryType());
} else {
System.out.println(MessageFormat.format(messgae,
new Object[] { Thread.currentThread().getName(), data.getJournalName(),
String.valueOf(data.getPosition()), format.format(date) }));
}
}
System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n");
return true;
}
示例5
/**
* 根据不同的参数,选择不同的方式获取数据
*/
private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout,
TimeUnit unit) {
if (timeout == null) {
return eventStore.tryGet(start, batchSize);
} else {
try {
if (timeout <= 0) {
return eventStore.get(start, batchSize);
} else {
return eventStore.get(start, batchSize, timeout, unit);
}
} catch (Exception e) {
throw new CanalServerException(e);
}
}
}
示例6
public void put(List<Event> datas) throws InterruptedException, CanalStoreException {
for (Event data : datas) {
Date date = new Date(data.getExecuteTime());
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
// System.out.println(MessageFormat.format(messgae, new Object[]
// { Thread.currentThread().getName(),
// header.getLogfilename(), header.getLogfileoffset(),
// format.format(date),
// data.getEntry().getEntryType(), "" }));
System.out.println(data.getEntryType());
} else {
System.out.println(MessageFormat.format(messgae,
new Object[] { Thread.currentThread().getName(), data.getJournalName(),
String.valueOf(data.getPosition()), format.format(date) }));
}
}
}
示例7
public boolean put(List<Event> datas, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
for (Event data : datas) {
Date date = new Date(data.getExecuteTime());
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
// System.out.println(MessageFormat.format(messgae, new Object[]
// { Thread.currentThread().getName(),
// header.getLogfilename(), header.getLogfileoffset(),
// format.format(date),
// data.getEntry().getEntryType(), "" }));
System.out.println(data.getEntryType());
} else {
System.out.println(MessageFormat.format(messgae,
new Object[] { Thread.currentThread().getName(), data.getJournalName(),
String.valueOf(data.getPosition()), format.format(date) }));
}
}
return true;
}
示例8
public boolean tryPut(List<Event> datas) throws CanalStoreException {
System.out.println("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
for (Event data : datas) {
Date date = new Date(data.getExecuteTime());
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
// System.out.println(MessageFormat.format(messgae, new Object[]
// { Thread.currentThread().getName(),
// header.getLogfilename(), header.getLogfileoffset(),
// format.format(date),
// data.getEntry().getEntryType(), "" }));
System.out.println(data.getEntryType());
} else {
System.out.println(MessageFormat.format(messgae,
new Object[] { Thread.currentThread().getName(), data.getJournalName(),
String.valueOf(data.getPosition()), format.format(date) }));
}
}
System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n");
return true;
}
示例9
/**
* 根据entry创建对应的Position对象
*/
public static LogPosition createPosition(Event event) {
EntryPosition position = new EntryPosition();
position.setJournalName(event.getJournalName());
position.setPosition(event.getPosition());
position.setTimestamp(event.getExecuteTime());
// add serverId at 2016-06-28
position.setServerId(event.getServerId());
// add gtid
position.setGtid(event.getGtid());
LogPosition logPosition = new LogPosition();
logPosition.setPostion(position);
logPosition.setIdentity(event.getLogIdentity());
return logPosition;
}
示例10
public void put(List<Event> data) throws InterruptedException, CanalStoreException {
if (data == null || data.isEmpty()) {
return;
}
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (!checkFreeSlotAt(putSequence.get() + data.size())) { // 检查是否有空位
notFull.await(); // wait until not full
}
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
doPut(data);
if (Thread.interrupted()) {
throw new InterruptedException();
}
} finally {
lock.unlock();
}
}
示例11
public boolean tryPut(List<Event> data) throws CanalStoreException {
if (data == null || data.isEmpty()) {
return true;
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (!checkFreeSlotAt(putSequence.get() + data.size())) {
return false;
} else {
doPut(data);
return true;
}
} finally {
lock.unlock();
}
}
示例12
public Events<Event> get(Position start, int batchSize) throws InterruptedException, CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (!checkUnGetSlotAt((LogPosition) start, batchSize))
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
return doGet(start, batchSize);
} finally {
lock.unlock();
}
}
示例13
/**
* 根据entry创建对应的Position对象
*/
public static LogPosition createPosition(Event event) {
EntryPosition position = new EntryPosition();
position.setJournalName(event.getJournalName());
position.setPosition(event.getPosition());
position.setTimestamp(event.getExecuteTime());
// add serverId at 2016-06-28
position.setServerId(event.getServerId());
// add gtid
position.setGtid(event.getGtid());
LogPosition logPosition = new LogPosition();
logPosition.setPostion(position);
logPosition.setIdentity(event.getLogIdentity());
return logPosition;
}
示例14
@Test
public void testOnePutOneGet() {
MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.setBatchMode(BatchMode.MEMSIZE);
eventStore.start();
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L));
Assert.assertTrue(result);
Position position = eventStore.getFirstPosition();
Events<Event> entrys = eventStore.tryGet(position, 1);
Assert.assertTrue(entrys.getEvents().size() == 1);
Assert.assertEquals(position, entrys.getPositionRange().getStart());
Assert.assertEquals(position, entrys.getPositionRange().getEnd());
eventStore.stop();
}
示例15
@Test
public void testOnePutOneGet() {
MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.start();
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L));
Assert.assertTrue(result);
Position position = eventStore.getFirstPosition();
Events<Event> entrys = eventStore.tryGet(position, 1);
Assert.assertTrue(entrys.getEvents().size() == 1);
Assert.assertEquals(position, entrys.getPositionRange().getStart());
Assert.assertEquals(position, entrys.getPositionRange().getEnd());
eventStore.stop();
}
示例16
protected boolean isPermit(Event event, long state) {
if (txState.intValue() == 1 && inTransaction.get()) { // 如果处于事务中,直接允许通过。因为事务头已经做过判断
return true;
} else if (txState.intValue() == 0) {
boolean result = super.isPermit(event, state);
if (result) {
// 可能第一条送过来的数据不为Begin,需要做判断处理,如果非事务,允许直接通过,比如DDL语句
if (isTransactionBegin(event)) {
if (txState.compareAndSet(0, 1)) {
inTransaction.set(true);
return true; // 事务允许通过
}
} else if (txState.compareAndSet(0, 2)) { // 非事务保护中
return true; // DDL/DCL允许通过
}
}
}
return false;
}
示例17
protected boolean doSink(List<Event> events) {
int size = events.size();
for (int i = 0; i < events.size(); i++) {
Event event = events.get(i);
try {
barrier.await(event);// 进行timeline的归并调度处理
if (filterTransactionEntry) {
super.doSink(Arrays.asList(event));
} else if (i == size - 1) {
// 针对事务数据,只有到最后一条数据都通过后,才进行sink操作,保证原子性
// 同时批量sink,也要保证在最后一条数据释放状态之前写出数据,否则就有并发问题
return super.doSink(events);
}
} catch (InterruptedException e) {
return false;
} finally {
barrier.clear(event);
}
}
return false;
}
示例18
protected boolean isPermit(Event event, long state) {
if (txState.intValue() == 1 && inTransaction.get()) { // 如果处于事务中,直接允许通过。因为事务头已经做过判断
return true;
} else if (txState.intValue() == 0) {
boolean result = super.isPermit(event, state);
if (result) {
// 可能第一条送过来的数据不为Begin,需要做判断处理,如果非事务,允许直接通过,比如DDL语句
if (isTransactionBegin(event)) {
if (txState.compareAndSet(0, 1)) {
inTransaction.set(true);
return true; // 事务允许通过
}
} else if (txState.compareAndSet(0, 2)) { // 非事务保护中
// 当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
return true; // DDL/DCL/TransactionEnd允许通过
}
}
}
return false;
}
示例19
protected boolean doSink(List<Event> events) {
int size = events.size();
for (int i = 0; i < events.size(); i++) {
Event event = events.get(i);
try {
barrier.await(event);// 进行timeline的归并调度处理
if (filterTransactionEntry) {
super.doSink(Arrays.asList(event));
} else if (i == size - 1) {
// 针对事务数据,只有到最后一条数据都通过后,才进行sink操作,保证原子性
// 同时批量sink,也要保证在最后一条数据释放状态之前写出数据,否则就有并发问题
return super.doSink(events);
}
} catch (InterruptedException e) {
return false;
} finally {
barrier.clear(event);
}
}
return false;
}
示例20
public Events<Event> get(Position start, int batchSize) throws InterruptedException, CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (!checkUnGetSlotAt((LogPosition) start, batchSize))
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
return doGet(start, batchSize);
} finally {
lock.unlock();
}
}
示例21
/**
* 根据不同的参数,选择不同的方式获取数据
*/
private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout,
TimeUnit unit) {
if (timeout == null) {
return eventStore.tryGet(start, batchSize);
} else {
try {
if (timeout <= 0) {
return eventStore.get(start, batchSize);
} else {
return eventStore.get(start, batchSize, timeout, unit);
}
} catch (Exception e) {
throw new CanalServerException(e);
}
}
}
示例22
public void put(List<Event> data) throws InterruptedException, CanalStoreException {
if (data == null || data.isEmpty()) {
return;
}
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (!checkFreeSlotAt(putSequence.get() + data.size())) { // 检查是否有空位
notFull.await(); // wait until not full
}
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
doPut(data);
if (Thread.interrupted()) {
throw new InterruptedException();
}
} finally {
lock.unlock();
}
}
示例23
public boolean put(List<Event> datas, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
for (Event data : datas) {
Date date = new Date(data.getExecuteTime());
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
// System.out.println(MessageFormat.format(messgae, new Object[]
// { Thread.currentThread().getName(),
// header.getLogfilename(), header.getLogfileoffset(),
// format.format(date),
// data.getEntry().getEntryType(), "" }));
System.out.println(data.getEntryType());
} else {
System.out.println(MessageFormat.format(messgae,
new Object[] { Thread.currentThread().getName(), data.getJournalName(),
String.valueOf(data.getPosition()), format.format(date) }));
}
}
return true;
}
示例24
public void await(Event event) throws InterruptedException {
try {
super.await(event);
} catch (InterruptedException e) {
// 出现线程中断,可能是因为关闭或者主备切换
// 主备切换对应的事务尾会未正常发送,需要强制设置为事务结束,允许其他队列通过
reset();
throw e;
}
}
示例25
public void await(Event event, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
try {
super.await(event, timeout, unit);
} catch (InterruptedException e) {
// 出现线程中断,可能是因为关闭或者主备切换
// 主备切换对应的事务尾会未正常发送,需要强制设置为事务结束,允许其他队列通过
reset();
throw e;
}
}
示例26
/**
* 判断自己的timestamp是否可以通过,带超时控制
*
* @throws InterruptedException
* @throws TimeoutException
*/
public void await(Event event, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
long timestamp = getTimestamp(event);
try {
lock.lockInterruptibly();
single(timestamp);
while (isPermit(event, timestamp) == false) {
condition.await(timeout, unit);
}
} finally {
lock.unlock();
}
}
示例27
private void profiling(List<Event> events, OP op) {
long localExecTime = 0L;
int deltaRows = 0;
if (events != null && !events.isEmpty()) {
for (Event e : events) {
if (localExecTime == 0 && e.getExecuteTime() > 0) {
localExecTime = e.getExecuteTime();
}
deltaRows += e.getRowsCount();
}
}
switch (op) {
case PUT:
putTableRows.addAndGet(deltaRows);
if (localExecTime > 0) {
putExecTime.lazySet(localExecTime);
}
break;
case GET:
getTableRows.addAndGet(deltaRows);
if (localExecTime > 0) {
getExecTime.lazySet(localExecTime);
}
break;
case ACK:
ackTableRows.addAndGet(deltaRows);
if (localExecTime > 0) {
ackExecTime.lazySet(localExecTime);
}
break;
default:
break;
}
}
示例28
public void start() throws CanalStoreException {
super.start();
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
indexMask = bufferSize - 1;
entries = new Event[bufferSize];
}
示例29
public boolean put(List<Event> data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
if (data == null || data.isEmpty()) {
return true;
}
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (checkFreeSlotAt(putSequence.get() + data.size())) {
doPut(data);
return true;
}
if (nanos <= 0) {
return false;
}
try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
示例30
public Events<Event> get(Position start, int batchSize, long timeout, TimeUnit unit) throws InterruptedException,
CanalStoreException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (checkUnGetSlotAt((LogPosition) start, batchSize)) {
return doGet(start, batchSize);
}
if (nanos <= 0) {
// 如果时间到了,有多少取多少
return doGet(start, batchSize);
}
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}