Java源码示例:org.apache.kudu.client.KuduScanToken

示例1
public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Integer rowLimit) {
    KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);

    if (tableProjections != null) {
        tokenBuilder.setProjectedColumnNames(tableProjections);
    }

    if (CollectionUtils.isNotEmpty(tableFilters)) {
        tableFilters.stream()
                .map(filter -> filter.toPredicate(table.getSchema()))
                .forEach(tokenBuilder::addPredicate);
    }

    if (rowLimit != null && rowLimit > 0) {
        tokenBuilder.limit(rowLimit);
    }

    return tokenBuilder.build();
}
 
示例2
@Override
public boolean start() throws IOException {
  LOG.debug("Starting Kudu reader");
  client =
      new AsyncKuduClient.AsyncKuduClientBuilder(source.spec.getMasterAddresses())
          .build()
          .syncClient();

  if (source.serializedToken != null) {
    // tokens available if the source is already split
    scanner = KuduScanToken.deserializeIntoScanner(source.serializedToken, client);
  } else {
    KuduTable table = client.openTable(source.spec.getTable());
    KuduScanner.KuduScannerBuilder builder =
        table.getAsyncClient().syncClient().newScannerBuilder(table);

    configureBuilder(source.spec, table.getSchema(), builder);
    scanner = builder.build();
  }

  return advance();
}
 
示例3
/**
 * Builds a set of scan tokens. The list of scan tokens are generated as if the entire table is being scanned
 * i.e. a SELECT * FROM TABLE equivalent expression. This list is used to assign the partition pie assignments
 * for all of the planned partition of operators. Each operator gets a part of the PIE as if all columns were
 * selected. Subsequently when a query is to be processed, the query is used to generate the scan tokens applicable
 * for that query. Given that partition pie represents the entire data set, the scan assignments for the current
 * query will be a subset.
 * @return The list of scan tokens as if the entire table is getting scanned.
 * @throws Exception in cases when the connection to kudu cluster cannot be closed.
 */
public List<KuduScanToken> getKuduScanTokensForSelectAllColumns() throws Exception
{
  // We are not using the current query for deciding the partition strategy but a SELECT * as
  // we do not want to want to optimize on just the current query. This prevents rapid throttling of operator
  // instances when the scan patterns are erratic. On the other hand, this might result on under utilized
  // operator resources in the DAG but will be consistent at a minimum.
  ApexKuduConnection apexKuduConnection = prototypeKuduInputOperator.getApexKuduConnectionInfo().build();
  KuduClient clientHandle = apexKuduConnection.getKuduClient();
  KuduTable table = apexKuduConnection.getKuduTable();
  KuduScanToken.KuduScanTokenBuilder builder = clientHandle.newScanTokenBuilder(table);
  List<String> allColumns = new ArrayList<>();
  List<ColumnSchema> columnList = apexKuduConnection.getKuduTable().getSchema().getColumns();
  for ( ColumnSchema column : columnList) {
    allColumns.add(column.getName());
  }
  builder.setProjectedColumnNames(allColumns);
  LOG.debug("Building the partition pie assignments for the input operator");
  List<KuduScanToken> allPossibleTokens = builder.build();
  apexKuduConnection.close();
  return allPossibleTokens;
}
 
示例4
public KuduScanner createScanner(KuduSplit kuduSplit)
{
    try {
        return KuduScanToken.deserializeIntoScanner(kuduSplit.getSerializedScanToken(), client);
    }
    catch (IOException e) {
        throw new RuntimeException(e);
    }
}
 
示例5
private KuduSplit toKuduSplit(KuduTableHandle tableHandle, KuduScanToken token,
        int primaryKeyColumnCount)
{
    try {
        byte[] serializedScanToken = token.serialize();
        return new KuduSplit(tableHandle, primaryKeyColumnCount, serializedScanToken);
    }
    catch (IOException e) {
        throw new PrestoException(GENERIC_INTERNAL_ERROR, e);
    }
}
 
示例6
@Override
public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
    startTableContext();
    Preconditions.checkNotNull(tableContext, "tableContext should not be null");

    List<KuduScanToken> tokens = tableContext.scanTokens(tableFilters, tableProjections, rowsLimit);

    KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];

    for (int i = 0; i < tokens.size(); i++) {
        KuduScanToken token = tokens.get(i);

        List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());

        for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
            locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
        }

        KuduInputSplit split = new KuduInputSplit(
                token.serialize(),
                i,
                locations.toArray(new String[locations.size()])
        );
        splits[i] = split;
    }

    if (splits.length < minNumSplits) {
        LOG.warn(" The minimum desired number of splits with your configured parallelism level " +
                        "is {}. Current kudu splits = {}. {} instances will remain idle.",
                minNumSplits,
                splits.length,
                (minNumSplits - splits.length)
        );
    }

    return splits;
}
 
示例7
@Override
public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
    startTableContext();
    Preconditions.checkNotNull(tableContext, "tableContext should not be null");

    List<KuduScanToken> tokens = tableContext.scanTokens(tableFilters, tableProjections, rowsLimit);

    KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];

    for (int i = 0; i < tokens.size(); i++) {
        KuduScanToken token = tokens.get(i);

        List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());

        for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
            locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
        }

        KuduInputSplit split = new KuduInputSplit(
                token.serialize(),
                i,
                locations.toArray(new String[locations.size()])
        );
        splits[i] = split;
    }

    if (splits.length < minNumSplits) {
        LOG.warn(" The minimum desired number of splits with your configured parallelism level " +
                        "is {}. Current kudu splits = {}. {} instances will remain idle.",
                minNumSplits,
                splits.length,
                (minNumSplits - splits.length)
        );
    }

    return splits;
}
 
示例8
@Override
public KuduScanner createScanner(KuduSplit kuduSplit) {
    try {
        KuduScanner scanner = KuduScanToken.deserializeIntoScanner(kuduSplit.getPb(), client);
        return scanner;
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
 
示例9
private KuduSplit toKuduSplit(KuduTableHandle tableHandle, KuduScanToken token,
                              int primaryKeyColumnCount) {
    try {
        byte[] pb = token.serialize();
        return new KuduSplit(tableHandle, primaryKeyColumnCount, pb);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
 
示例10
public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {

        List<KuduScanToken> tokens = scanTokens(tableFilters, tableProjections, readerConfig.getRowLimit());

        KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];

        for (int i = 0; i < tokens.size(); i++) {
            KuduScanToken token = tokens.get(i);

            List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());

            for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
                locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
            }

            KuduInputSplit split = new KuduInputSplit(
                    token.serialize(),
                    i,
                    locations.toArray(new String[locations.size()])
            );
            splits[i] = split;
        }

        if (splits.length < minNumSplits) {
            log.warn(" The minimum desired number of splits with your configured parallelism level " +
                            "is {}. Current kudu splits = {}. {} instances will remain idle.",
                    minNumSplits,
                    splits.length,
                    (minNumSplits - splits.length)
            );
        }

        return splits;
    }
 
示例11
@Override
public List<byte[]> createTabletScanners(KuduIO.Read spec) throws KuduException {
  try (KuduClient client = getKuduClient(spec.getMasterAddresses())) {
    KuduTable table = client.openTable(spec.getTable());
    KuduScanToken.KuduScanTokenBuilder builder = client.newScanTokenBuilder(table);
    configureBuilder(spec, table.getSchema(), builder);
    List<KuduScanToken> tokens = builder.build();
    return tokens.stream().map(t -> uncheckCall(t::serialize)).collect(Collectors.toList());
  }
}
 
示例12
/***
 * Builds a list of scan assignment metadata instances from raw kudu scan tokens as returned by the Kudu Query planner
 *  assuming all of the columns and rows are to be scanned
 * @param partitions The current set of partitions
 * @param context The current partitioning context
 * @return The new set of partitions
 * @throws Exception if the Kudu connection opened for generating the scan plan cannot be closed
 */
public List<KuduPartitionScanAssignmentMeta> getListOfPartitionAssignments(
    Collection<Partition<AbstractKuduInputOperator>> partitions, PartitioningContext context) throws Exception
{
  List<KuduPartitionScanAssignmentMeta> returnList = new ArrayList<>();
  List<KuduScanToken> allColumnsScanTokens = new ArrayList<>();
  // we are looking at a first time invocation scenario
  try {
    allColumnsScanTokens.addAll(getKuduScanTokensForSelectAllColumns());
  } catch (Exception e) {
    LOG.error(" Error while calculating the number of scan tokens for all column projections " + e.getMessage(),e);
  }
  if ( allColumnsScanTokens.size() == 0 ) {
    LOG.error("No column information could be extracted from the Kudu table");
    throw new Exception("No column information could be extracted from the Kudu table");
  }
  int totalPartitionCount = allColumnsScanTokens.size();
  LOG.info("Determined maximum as " + totalPartitionCount + " tablets for this table");
  for (int i = 0; i < totalPartitionCount; i++) {
    KuduPartitionScanAssignmentMeta aMeta = new KuduPartitionScanAssignmentMeta();
    aMeta.setOrdinal(i);
    aMeta.setTotalSize(totalPartitionCount);
    returnList.add(aMeta);
    LOG.info("A planned scan meta of the total partitions " + aMeta);
  }
  LOG.info("Total kudu partition size is " + returnList.size());
  return returnList;
}
 
示例13
public void truncateTable() throws Exception
{
  AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> scannerForDeletingRows =
      unitTestStepwiseScanInputOperator.getScanner();
  List<KuduScanToken> scansForAllTablets = unitTestStepwiseScanInputOperator
      .getPartitioner().getKuduScanTokensForSelectAllColumns();
  ApexKuduConnection aCurrentConnection = scannerForDeletingRows.getConnectionPoolForThreads().get(0);
  KuduSession aSessionForDeletes = aCurrentConnection.getKuduClient().newSession();
  KuduTable currentTable = aCurrentConnection.getKuduTable();
  for ( KuduScanToken aTabletScanToken : scansForAllTablets) {
    KuduScanner aScanner = aTabletScanToken.intoScanner(aCurrentConnection.getKuduClient());
    while ( aScanner.hasMoreRows()) {
      RowResultIterator itrForRows = aScanner.nextRows();
      while ( itrForRows.hasNext()) {
        RowResult aRow = itrForRows.next();
        int intRowKey = aRow.getInt("introwkey");
        String stringRowKey = aRow.getString("stringrowkey");
        long timestampRowKey = aRow.getLong("timestamprowkey");
        Delete aDeleteOp = currentTable.newDelete();
        aDeleteOp.getRow().addInt("introwkey",intRowKey);
        aDeleteOp.getRow().addString("stringrowkey", stringRowKey);
        aDeleteOp.getRow().addLong("timestamprowkey",timestampRowKey);
        aSessionForDeletes.apply(aDeleteOp);
      }
    }
  }
  aSessionForDeletes.close();
  Thread.sleep(2000); // Sleep to allow for scans to complete
}
 
示例14
@KuduClusterTestContext(kuduClusterBasedTest = true)
@Test
public void testKuduSelectAllScanTokens() throws Exception
{
  initOperatorState();
  AbstractKuduInputPartitioner partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
  List<KuduScanToken> allScanTokens = partitioner.getKuduScanTokensForSelectAllColumns();
  assertEquals(KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE,allScanTokens.size());
}
 
示例15
public ApexKuduConnection buildMockWiring(AbstractKuduInputOperator abstractKuduInputOperator,
    int numScanTokens) throws Exception
{
  ApexKuduConnection mockedConnectionHandle = PowerMockito.mock(ApexKuduConnection.class);
  ApexKuduConnection.ApexKuduConnectionBuilder mockedConnectionHandleBuilder = PowerMockito.mock(
      ApexKuduConnection.ApexKuduConnectionBuilder.class);
  KuduClient mockedClient = PowerMockito.mock(KuduClient.class);
  KuduSession mockedKuduSession = PowerMockito.mock(KuduSession.class);
  KuduTable mockedKuduTable = PowerMockito.mock(KuduTable.class);
  KuduScanToken.KuduScanTokenBuilder mockedScanTokenBuilder = PowerMockito.mock(
      KuduScanToken.KuduScanTokenBuilder.class);
  List<KuduScanToken> mockedScanTokens = new ArrayList<>();
  int scanTokensToBuild = numScanTokens;
  for (int i = 0; i < scanTokensToBuild; i++) {
    mockedScanTokens.add(PowerMockito.mock(KuduScanToken.class));
  }
  PowerMockito.mockStatic(KryoCloneUtils.class);
  when(KryoCloneUtils.cloneObject(abstractKuduInputOperator)).thenReturn(abstractKuduInputOperator);
  //wire the mocks
  when(abstractKuduInputOperator.getApexKuduConnectionInfo()).thenReturn(mockedConnectionHandleBuilder);
  when(mockedConnectionHandle.getKuduClient()).thenReturn(mockedClient);
  when(mockedClient.newSession()).thenReturn(mockedKuduSession);
  when(mockedConnectionHandle.getKuduTable()).thenReturn(mockedKuduTable);
  when(mockedConnectionHandle.getKuduSession()).thenReturn(mockedKuduSession);
  when(mockedConnectionHandle.getBuilderForThisConnection()).thenReturn(mockedConnectionHandleBuilder);
  when(mockedClient.openTable(tableName)).thenReturn(mockedKuduTable);
  when(mockedConnectionHandleBuilder.build()).thenReturn(mockedConnectionHandle);
  when(mockedKuduTable.getSchema()).thenReturn(schemaForUnitTests);
  when(mockedClient.newScanTokenBuilder(mockedKuduTable)).thenReturn(mockedScanTokenBuilder);
  when(mockedScanTokenBuilder.build()).thenReturn(mockedScanTokens);
  return mockedConnectionHandle;
}
 
示例16
public KuduReaderIterator scanner(byte[] token) throws IOException {
    return new KuduReaderIterator(KuduScanToken.deserializeIntoScanner(token, client));
}
 
示例17
/***
 * The main logic which takes the parsed in query and builds the Kudud scan tokens specific to this query.
 * It makes sure that these scan tokens are sorted before the actual scan tokens that are to be executed in the
 * current physical instance of the operator are shortlisted. Since the kudu scan taken builder gives the scan
 * tokens for the query and does not differentiate between a distributed system and a single instance system, this
 * method takes the plan as generated by the Kudu scan token builder and then chooses only those segments that were
 * decided to be the responsibility of this operator at partitioning time.
 * @param parsedQuery The parsed query instance
 * @return A list of partition scan metadata objects that are applicable for this instance of the physical operator
 * i.e. the operator owning this instance of the scanner.
 * @throws IOException If the scan assignment cannot be serialized
 */
public List<KuduPartitionScanAssignmentMeta> preparePlanForScanners(SQLToKuduPredicatesTranslator parsedQuery)
  throws IOException
{
  List<KuduPredicate> predicateList = parsedQuery.getKuduSQLParseTreeListener().getKuduPredicateList();
  ApexKuduConnection apexKuduConnection = verifyConnectionStaleness(0);// we will have atleast one connection
  KuduScanToken.KuduScanTokenBuilder builder = apexKuduConnection.getKuduClient().newScanTokenBuilder(
      apexKuduConnection.getKuduTable());
  builder = builder.setProjectedColumnNames(new ArrayList<>(
      parsedQuery.getKuduSQLParseTreeListener().getListOfColumnsUsed()));
  for (KuduPredicate aPredicate : predicateList) {
    builder = builder.addPredicate(aPredicate);
  }
  builder.setFaultTolerant(parentOperator.isFaultTolerantScanner());
  Map<String,String> optionsUsedForThisQuery = parentOperator.getOptionsEnabledForCurrentQuery();
  if ( optionsUsedForThisQuery.containsKey(KuduSQLParseTreeListener.READ_SNAPSHOT_TIME)) {
    try {
      long readSnapShotTime = Long.valueOf(optionsUsedForThisQuery.get(KuduSQLParseTreeListener.READ_SNAPSHOT_TIME));
      builder = builder.readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT);
      builder = builder.snapshotTimestampMicros(readSnapShotTime);
      LOG.info("Using read snapshot for this query as " + readSnapShotTime);
    } catch ( Exception ex) {
      LOG.error("Cannot parse the Read snaptshot time " + ex.getMessage(), ex);
    }
  }
  List<KuduScanToken> allPossibleScanTokens = builder.build();
  Collections.sort(allPossibleScanTokens, // Make sure we deal with a sorted list of scan tokens
      new Comparator<KuduScanToken>()
    {
      @Override
      public int compare(KuduScanToken left, KuduScanToken right)
      {
        return left.compareTo(right);
      }
    });
  LOG.info(" Query will scan " + allPossibleScanTokens.size() + " tablets");
  if ( LOG.isDebugEnabled()) {
    LOG.debug(" Predicates scheduled for this query are " + predicateList.size());
    for ( int i = 0; i < allPossibleScanTokens.size(); i++) {
      LOG.debug("A tablet scheduled for all operators scanning is " + allPossibleScanTokens.get(i).getTablet());
    }
  }
  List<KuduPartitionScanAssignmentMeta> partitionPieForThisOperator = parentOperator.getPartitionPieAssignment();
  List<KuduPartitionScanAssignmentMeta> returnOfAssignments = new ArrayList<>();
  int totalScansForThisQuery = allPossibleScanTokens.size();
  int counterForPartAssignments = 0;
  for (KuduPartitionScanAssignmentMeta aPartofThePie : partitionPieForThisOperator) {
    if ( aPartofThePie.getOrdinal() < totalScansForThisQuery) { // a given query plan might have less scantokens
      KuduPartitionScanAssignmentMeta aMetaForThisQuery = new KuduPartitionScanAssignmentMeta();
      aMetaForThisQuery.setTotalSize(totalScansForThisQuery);
      aMetaForThisQuery.setOrdinal(counterForPartAssignments);
      counterForPartAssignments += 1;
      aMetaForThisQuery.setCurrentQuery(parsedQuery.getSqlExpresssion());
      // we pick up only those ordinals that are part of the original partition pie assignment
      KuduScanToken aTokenForThisOperator = allPossibleScanTokens.get(aPartofThePie.getOrdinal());
      aMetaForThisQuery.setSerializedKuduScanToken(aTokenForThisOperator.serialize());
      returnOfAssignments.add(aMetaForThisQuery);
      LOG.debug("Added query scan for this operator " + aMetaForThisQuery + " with scan tablet as " +
          allPossibleScanTokens.get(aPartofThePie.getOrdinal()).getTablet());
    }
  }
  LOG.info(" A total of " + returnOfAssignments.size() + " have been scheduled for this operator");
  return returnOfAssignments;
}
 
示例18
@Override
public Long call() throws Exception
{
  long numRowsScanned = 0;
  KuduScanner aPartitionSpecificScanner = KuduScanToken.deserializeIntoScanner(
      kuduPartitionScanAssignmentMeta.getSerializedKuduScanToken(), kuduClientHandle);
  LOG.info("Scanning the following tablet " + KuduScanToken.stringifySerializedToken(kuduPartitionScanAssignmentMeta
      .getSerializedKuduScanToken(), kuduClientHandle));
  KuduRecordWithMeta<T> beginScanRecord = new KuduRecordWithMeta<>();
  beginScanRecord.setBeginScanMarker(true);
  beginScanRecord.setTabletMetadata(kuduPartitionScanAssignmentMeta);
  bufferForTransmittingRecords.add(beginScanRecord); // Add a record entry that denotes the end of this scan.
  while ( aPartitionSpecificScanner.hasMoreRows()) {
    LOG.debug("Number of columns being returned for this read " +
        aPartitionSpecificScanner.getProjectionSchema().getColumnCount());
    RowResultIterator resultIterator = aPartitionSpecificScanner.nextRows();
    if (resultIterator == null) {
      break;
    } else {
      while (resultIterator.hasNext()) {
        KuduRecordWithMeta<T> recordWithMeta = new KuduRecordWithMeta<>();
        RowResult aRow = resultIterator.next();
        recordWithMeta.setPositionInScan(numRowsScanned);
        T payload = clazzForResultObject.newInstance();
        recordWithMeta.setThePayload(payload);
        recordWithMeta.setEndOfScanMarker(false);
        recordWithMeta.setTabletMetadata(kuduPartitionScanAssignmentMeta);
        setValuesInPOJO(aRow,payload);
        bufferForTransmittingRecords.add(recordWithMeta);
        numRowsScanned += 1;
      }
    }
  }
  aPartitionSpecificScanner.close();
  KuduRecordWithMeta<T> endScanRecord = new KuduRecordWithMeta<>();
  endScanRecord.setEndOfScanMarker(true);
  endScanRecord.setTabletMetadata(kuduPartitionScanAssignmentMeta);
  bufferForTransmittingRecords.add(endScanRecord); // Add a record entry that denotes the end of this scan.
  LOG.info(" Scanned a total of " + numRowsScanned + " for this scanner thread @tablet " +
      KuduScanToken.stringifySerializedToken(kuduPartitionScanAssignmentMeta.getSerializedKuduScanToken(),
      kuduClientHandle));
  return numRowsScanned;
}