Java源码示例:org.apache.kudu.client.KuduScanToken
示例1
public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Integer rowLimit) {
KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
if (tableProjections != null) {
tokenBuilder.setProjectedColumnNames(tableProjections);
}
if (CollectionUtils.isNotEmpty(tableFilters)) {
tableFilters.stream()
.map(filter -> filter.toPredicate(table.getSchema()))
.forEach(tokenBuilder::addPredicate);
}
if (rowLimit != null && rowLimit > 0) {
tokenBuilder.limit(rowLimit);
}
return tokenBuilder.build();
}
示例2
@Override
public boolean start() throws IOException {
LOG.debug("Starting Kudu reader");
client =
new AsyncKuduClient.AsyncKuduClientBuilder(source.spec.getMasterAddresses())
.build()
.syncClient();
if (source.serializedToken != null) {
// tokens available if the source is already split
scanner = KuduScanToken.deserializeIntoScanner(source.serializedToken, client);
} else {
KuduTable table = client.openTable(source.spec.getTable());
KuduScanner.KuduScannerBuilder builder =
table.getAsyncClient().syncClient().newScannerBuilder(table);
configureBuilder(source.spec, table.getSchema(), builder);
scanner = builder.build();
}
return advance();
}
示例3
/**
* Builds a set of scan tokens. The list of scan tokens are generated as if the entire table is being scanned
* i.e. a SELECT * FROM TABLE equivalent expression. This list is used to assign the partition pie assignments
* for all of the planned partition of operators. Each operator gets a part of the PIE as if all columns were
* selected. Subsequently when a query is to be processed, the query is used to generate the scan tokens applicable
* for that query. Given that partition pie represents the entire data set, the scan assignments for the current
* query will be a subset.
* @return The list of scan tokens as if the entire table is getting scanned.
* @throws Exception in cases when the connection to kudu cluster cannot be closed.
*/
public List<KuduScanToken> getKuduScanTokensForSelectAllColumns() throws Exception
{
// We are not using the current query for deciding the partition strategy but a SELECT * as
// we do not want to want to optimize on just the current query. This prevents rapid throttling of operator
// instances when the scan patterns are erratic. On the other hand, this might result on under utilized
// operator resources in the DAG but will be consistent at a minimum.
ApexKuduConnection apexKuduConnection = prototypeKuduInputOperator.getApexKuduConnectionInfo().build();
KuduClient clientHandle = apexKuduConnection.getKuduClient();
KuduTable table = apexKuduConnection.getKuduTable();
KuduScanToken.KuduScanTokenBuilder builder = clientHandle.newScanTokenBuilder(table);
List<String> allColumns = new ArrayList<>();
List<ColumnSchema> columnList = apexKuduConnection.getKuduTable().getSchema().getColumns();
for ( ColumnSchema column : columnList) {
allColumns.add(column.getName());
}
builder.setProjectedColumnNames(allColumns);
LOG.debug("Building the partition pie assignments for the input operator");
List<KuduScanToken> allPossibleTokens = builder.build();
apexKuduConnection.close();
return allPossibleTokens;
}
示例4
public KuduScanner createScanner(KuduSplit kuduSplit)
{
try {
return KuduScanToken.deserializeIntoScanner(kuduSplit.getSerializedScanToken(), client);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
示例5
private KuduSplit toKuduSplit(KuduTableHandle tableHandle, KuduScanToken token,
int primaryKeyColumnCount)
{
try {
byte[] serializedScanToken = token.serialize();
return new KuduSplit(tableHandle, primaryKeyColumnCount, serializedScanToken);
}
catch (IOException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, e);
}
}
示例6
@Override
public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
startTableContext();
Preconditions.checkNotNull(tableContext, "tableContext should not be null");
List<KuduScanToken> tokens = tableContext.scanTokens(tableFilters, tableProjections, rowsLimit);
KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];
for (int i = 0; i < tokens.size(); i++) {
KuduScanToken token = tokens.get(i);
List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
}
KuduInputSplit split = new KuduInputSplit(
token.serialize(),
i,
locations.toArray(new String[locations.size()])
);
splits[i] = split;
}
if (splits.length < minNumSplits) {
LOG.warn(" The minimum desired number of splits with your configured parallelism level " +
"is {}. Current kudu splits = {}. {} instances will remain idle.",
minNumSplits,
splits.length,
(minNumSplits - splits.length)
);
}
return splits;
}
示例7
@Override
public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
startTableContext();
Preconditions.checkNotNull(tableContext, "tableContext should not be null");
List<KuduScanToken> tokens = tableContext.scanTokens(tableFilters, tableProjections, rowsLimit);
KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];
for (int i = 0; i < tokens.size(); i++) {
KuduScanToken token = tokens.get(i);
List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
}
KuduInputSplit split = new KuduInputSplit(
token.serialize(),
i,
locations.toArray(new String[locations.size()])
);
splits[i] = split;
}
if (splits.length < minNumSplits) {
LOG.warn(" The minimum desired number of splits with your configured parallelism level " +
"is {}. Current kudu splits = {}. {} instances will remain idle.",
minNumSplits,
splits.length,
(minNumSplits - splits.length)
);
}
return splits;
}
示例8
@Override
public KuduScanner createScanner(KuduSplit kuduSplit) {
try {
KuduScanner scanner = KuduScanToken.deserializeIntoScanner(kuduSplit.getPb(), client);
return scanner;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
示例9
private KuduSplit toKuduSplit(KuduTableHandle tableHandle, KuduScanToken token,
int primaryKeyColumnCount) {
try {
byte[] pb = token.serialize();
return new KuduSplit(tableHandle, primaryKeyColumnCount, pb);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
示例10
public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
List<KuduScanToken> tokens = scanTokens(tableFilters, tableProjections, readerConfig.getRowLimit());
KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];
for (int i = 0; i < tokens.size(); i++) {
KuduScanToken token = tokens.get(i);
List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
}
KuduInputSplit split = new KuduInputSplit(
token.serialize(),
i,
locations.toArray(new String[locations.size()])
);
splits[i] = split;
}
if (splits.length < minNumSplits) {
log.warn(" The minimum desired number of splits with your configured parallelism level " +
"is {}. Current kudu splits = {}. {} instances will remain idle.",
minNumSplits,
splits.length,
(minNumSplits - splits.length)
);
}
return splits;
}
示例11
@Override
public List<byte[]> createTabletScanners(KuduIO.Read spec) throws KuduException {
try (KuduClient client = getKuduClient(spec.getMasterAddresses())) {
KuduTable table = client.openTable(spec.getTable());
KuduScanToken.KuduScanTokenBuilder builder = client.newScanTokenBuilder(table);
configureBuilder(spec, table.getSchema(), builder);
List<KuduScanToken> tokens = builder.build();
return tokens.stream().map(t -> uncheckCall(t::serialize)).collect(Collectors.toList());
}
}
示例12
/***
* Builds a list of scan assignment metadata instances from raw kudu scan tokens as returned by the Kudu Query planner
* assuming all of the columns and rows are to be scanned
* @param partitions The current set of partitions
* @param context The current partitioning context
* @return The new set of partitions
* @throws Exception if the Kudu connection opened for generating the scan plan cannot be closed
*/
public List<KuduPartitionScanAssignmentMeta> getListOfPartitionAssignments(
Collection<Partition<AbstractKuduInputOperator>> partitions, PartitioningContext context) throws Exception
{
List<KuduPartitionScanAssignmentMeta> returnList = new ArrayList<>();
List<KuduScanToken> allColumnsScanTokens = new ArrayList<>();
// we are looking at a first time invocation scenario
try {
allColumnsScanTokens.addAll(getKuduScanTokensForSelectAllColumns());
} catch (Exception e) {
LOG.error(" Error while calculating the number of scan tokens for all column projections " + e.getMessage(),e);
}
if ( allColumnsScanTokens.size() == 0 ) {
LOG.error("No column information could be extracted from the Kudu table");
throw new Exception("No column information could be extracted from the Kudu table");
}
int totalPartitionCount = allColumnsScanTokens.size();
LOG.info("Determined maximum as " + totalPartitionCount + " tablets for this table");
for (int i = 0; i < totalPartitionCount; i++) {
KuduPartitionScanAssignmentMeta aMeta = new KuduPartitionScanAssignmentMeta();
aMeta.setOrdinal(i);
aMeta.setTotalSize(totalPartitionCount);
returnList.add(aMeta);
LOG.info("A planned scan meta of the total partitions " + aMeta);
}
LOG.info("Total kudu partition size is " + returnList.size());
return returnList;
}
示例13
public void truncateTable() throws Exception
{
AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> scannerForDeletingRows =
unitTestStepwiseScanInputOperator.getScanner();
List<KuduScanToken> scansForAllTablets = unitTestStepwiseScanInputOperator
.getPartitioner().getKuduScanTokensForSelectAllColumns();
ApexKuduConnection aCurrentConnection = scannerForDeletingRows.getConnectionPoolForThreads().get(0);
KuduSession aSessionForDeletes = aCurrentConnection.getKuduClient().newSession();
KuduTable currentTable = aCurrentConnection.getKuduTable();
for ( KuduScanToken aTabletScanToken : scansForAllTablets) {
KuduScanner aScanner = aTabletScanToken.intoScanner(aCurrentConnection.getKuduClient());
while ( aScanner.hasMoreRows()) {
RowResultIterator itrForRows = aScanner.nextRows();
while ( itrForRows.hasNext()) {
RowResult aRow = itrForRows.next();
int intRowKey = aRow.getInt("introwkey");
String stringRowKey = aRow.getString("stringrowkey");
long timestampRowKey = aRow.getLong("timestamprowkey");
Delete aDeleteOp = currentTable.newDelete();
aDeleteOp.getRow().addInt("introwkey",intRowKey);
aDeleteOp.getRow().addString("stringrowkey", stringRowKey);
aDeleteOp.getRow().addLong("timestamprowkey",timestampRowKey);
aSessionForDeletes.apply(aDeleteOp);
}
}
}
aSessionForDeletes.close();
Thread.sleep(2000); // Sleep to allow for scans to complete
}
示例14
@KuduClusterTestContext(kuduClusterBasedTest = true)
@Test
public void testKuduSelectAllScanTokens() throws Exception
{
initOperatorState();
AbstractKuduInputPartitioner partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
List<KuduScanToken> allScanTokens = partitioner.getKuduScanTokensForSelectAllColumns();
assertEquals(KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE,allScanTokens.size());
}
示例15
public ApexKuduConnection buildMockWiring(AbstractKuduInputOperator abstractKuduInputOperator,
int numScanTokens) throws Exception
{
ApexKuduConnection mockedConnectionHandle = PowerMockito.mock(ApexKuduConnection.class);
ApexKuduConnection.ApexKuduConnectionBuilder mockedConnectionHandleBuilder = PowerMockito.mock(
ApexKuduConnection.ApexKuduConnectionBuilder.class);
KuduClient mockedClient = PowerMockito.mock(KuduClient.class);
KuduSession mockedKuduSession = PowerMockito.mock(KuduSession.class);
KuduTable mockedKuduTable = PowerMockito.mock(KuduTable.class);
KuduScanToken.KuduScanTokenBuilder mockedScanTokenBuilder = PowerMockito.mock(
KuduScanToken.KuduScanTokenBuilder.class);
List<KuduScanToken> mockedScanTokens = new ArrayList<>();
int scanTokensToBuild = numScanTokens;
for (int i = 0; i < scanTokensToBuild; i++) {
mockedScanTokens.add(PowerMockito.mock(KuduScanToken.class));
}
PowerMockito.mockStatic(KryoCloneUtils.class);
when(KryoCloneUtils.cloneObject(abstractKuduInputOperator)).thenReturn(abstractKuduInputOperator);
//wire the mocks
when(abstractKuduInputOperator.getApexKuduConnectionInfo()).thenReturn(mockedConnectionHandleBuilder);
when(mockedConnectionHandle.getKuduClient()).thenReturn(mockedClient);
when(mockedClient.newSession()).thenReturn(mockedKuduSession);
when(mockedConnectionHandle.getKuduTable()).thenReturn(mockedKuduTable);
when(mockedConnectionHandle.getKuduSession()).thenReturn(mockedKuduSession);
when(mockedConnectionHandle.getBuilderForThisConnection()).thenReturn(mockedConnectionHandleBuilder);
when(mockedClient.openTable(tableName)).thenReturn(mockedKuduTable);
when(mockedConnectionHandleBuilder.build()).thenReturn(mockedConnectionHandle);
when(mockedKuduTable.getSchema()).thenReturn(schemaForUnitTests);
when(mockedClient.newScanTokenBuilder(mockedKuduTable)).thenReturn(mockedScanTokenBuilder);
when(mockedScanTokenBuilder.build()).thenReturn(mockedScanTokens);
return mockedConnectionHandle;
}
示例16
public KuduReaderIterator scanner(byte[] token) throws IOException {
return new KuduReaderIterator(KuduScanToken.deserializeIntoScanner(token, client));
}
示例17
/***
* The main logic which takes the parsed in query and builds the Kudud scan tokens specific to this query.
* It makes sure that these scan tokens are sorted before the actual scan tokens that are to be executed in the
* current physical instance of the operator are shortlisted. Since the kudu scan taken builder gives the scan
* tokens for the query and does not differentiate between a distributed system and a single instance system, this
* method takes the plan as generated by the Kudu scan token builder and then chooses only those segments that were
* decided to be the responsibility of this operator at partitioning time.
* @param parsedQuery The parsed query instance
* @return A list of partition scan metadata objects that are applicable for this instance of the physical operator
* i.e. the operator owning this instance of the scanner.
* @throws IOException If the scan assignment cannot be serialized
*/
public List<KuduPartitionScanAssignmentMeta> preparePlanForScanners(SQLToKuduPredicatesTranslator parsedQuery)
throws IOException
{
List<KuduPredicate> predicateList = parsedQuery.getKuduSQLParseTreeListener().getKuduPredicateList();
ApexKuduConnection apexKuduConnection = verifyConnectionStaleness(0);// we will have atleast one connection
KuduScanToken.KuduScanTokenBuilder builder = apexKuduConnection.getKuduClient().newScanTokenBuilder(
apexKuduConnection.getKuduTable());
builder = builder.setProjectedColumnNames(new ArrayList<>(
parsedQuery.getKuduSQLParseTreeListener().getListOfColumnsUsed()));
for (KuduPredicate aPredicate : predicateList) {
builder = builder.addPredicate(aPredicate);
}
builder.setFaultTolerant(parentOperator.isFaultTolerantScanner());
Map<String,String> optionsUsedForThisQuery = parentOperator.getOptionsEnabledForCurrentQuery();
if ( optionsUsedForThisQuery.containsKey(KuduSQLParseTreeListener.READ_SNAPSHOT_TIME)) {
try {
long readSnapShotTime = Long.valueOf(optionsUsedForThisQuery.get(KuduSQLParseTreeListener.READ_SNAPSHOT_TIME));
builder = builder.readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT);
builder = builder.snapshotTimestampMicros(readSnapShotTime);
LOG.info("Using read snapshot for this query as " + readSnapShotTime);
} catch ( Exception ex) {
LOG.error("Cannot parse the Read snaptshot time " + ex.getMessage(), ex);
}
}
List<KuduScanToken> allPossibleScanTokens = builder.build();
Collections.sort(allPossibleScanTokens, // Make sure we deal with a sorted list of scan tokens
new Comparator<KuduScanToken>()
{
@Override
public int compare(KuduScanToken left, KuduScanToken right)
{
return left.compareTo(right);
}
});
LOG.info(" Query will scan " + allPossibleScanTokens.size() + " tablets");
if ( LOG.isDebugEnabled()) {
LOG.debug(" Predicates scheduled for this query are " + predicateList.size());
for ( int i = 0; i < allPossibleScanTokens.size(); i++) {
LOG.debug("A tablet scheduled for all operators scanning is " + allPossibleScanTokens.get(i).getTablet());
}
}
List<KuduPartitionScanAssignmentMeta> partitionPieForThisOperator = parentOperator.getPartitionPieAssignment();
List<KuduPartitionScanAssignmentMeta> returnOfAssignments = new ArrayList<>();
int totalScansForThisQuery = allPossibleScanTokens.size();
int counterForPartAssignments = 0;
for (KuduPartitionScanAssignmentMeta aPartofThePie : partitionPieForThisOperator) {
if ( aPartofThePie.getOrdinal() < totalScansForThisQuery) { // a given query plan might have less scantokens
KuduPartitionScanAssignmentMeta aMetaForThisQuery = new KuduPartitionScanAssignmentMeta();
aMetaForThisQuery.setTotalSize(totalScansForThisQuery);
aMetaForThisQuery.setOrdinal(counterForPartAssignments);
counterForPartAssignments += 1;
aMetaForThisQuery.setCurrentQuery(parsedQuery.getSqlExpresssion());
// we pick up only those ordinals that are part of the original partition pie assignment
KuduScanToken aTokenForThisOperator = allPossibleScanTokens.get(aPartofThePie.getOrdinal());
aMetaForThisQuery.setSerializedKuduScanToken(aTokenForThisOperator.serialize());
returnOfAssignments.add(aMetaForThisQuery);
LOG.debug("Added query scan for this operator " + aMetaForThisQuery + " with scan tablet as " +
allPossibleScanTokens.get(aPartofThePie.getOrdinal()).getTablet());
}
}
LOG.info(" A total of " + returnOfAssignments.size() + " have been scheduled for this operator");
return returnOfAssignments;
}
示例18
@Override
public Long call() throws Exception
{
long numRowsScanned = 0;
KuduScanner aPartitionSpecificScanner = KuduScanToken.deserializeIntoScanner(
kuduPartitionScanAssignmentMeta.getSerializedKuduScanToken(), kuduClientHandle);
LOG.info("Scanning the following tablet " + KuduScanToken.stringifySerializedToken(kuduPartitionScanAssignmentMeta
.getSerializedKuduScanToken(), kuduClientHandle));
KuduRecordWithMeta<T> beginScanRecord = new KuduRecordWithMeta<>();
beginScanRecord.setBeginScanMarker(true);
beginScanRecord.setTabletMetadata(kuduPartitionScanAssignmentMeta);
bufferForTransmittingRecords.add(beginScanRecord); // Add a record entry that denotes the end of this scan.
while ( aPartitionSpecificScanner.hasMoreRows()) {
LOG.debug("Number of columns being returned for this read " +
aPartitionSpecificScanner.getProjectionSchema().getColumnCount());
RowResultIterator resultIterator = aPartitionSpecificScanner.nextRows();
if (resultIterator == null) {
break;
} else {
while (resultIterator.hasNext()) {
KuduRecordWithMeta<T> recordWithMeta = new KuduRecordWithMeta<>();
RowResult aRow = resultIterator.next();
recordWithMeta.setPositionInScan(numRowsScanned);
T payload = clazzForResultObject.newInstance();
recordWithMeta.setThePayload(payload);
recordWithMeta.setEndOfScanMarker(false);
recordWithMeta.setTabletMetadata(kuduPartitionScanAssignmentMeta);
setValuesInPOJO(aRow,payload);
bufferForTransmittingRecords.add(recordWithMeta);
numRowsScanned += 1;
}
}
}
aPartitionSpecificScanner.close();
KuduRecordWithMeta<T> endScanRecord = new KuduRecordWithMeta<>();
endScanRecord.setEndOfScanMarker(true);
endScanRecord.setTabletMetadata(kuduPartitionScanAssignmentMeta);
bufferForTransmittingRecords.add(endScanRecord); // Add a record entry that denotes the end of this scan.
LOG.info(" Scanned a total of " + numRowsScanned + " for this scanner thread @tablet " +
KuduScanToken.stringifySerializedToken(kuduPartitionScanAssignmentMeta.getSerializedKuduScanToken(),
kuduClientHandle));
return numRowsScanned;
}