提问者:小点点

使用Spring批处理文件项读取器的多线程


在Spring批处理中,我试图读取CSV文件,并希望将每一行分配给一个单独的线程并对其进行处理。我试图通过使用TaskExecutor来实现它,但所有线程都在一次拾取同一行。我还尝试使用Partioner实现这个概念,同样的事情也发生了。请参阅下面我的配置Xml。

步骤说明

    <step id="Step2">
        <tasklet task-executor="taskExecutor">
            <chunk reader="reader" processor="processor" writer="writer" commit-interval="1" skip-limit="1">
            </chunk>
        </tasklet> 
    </step>

              <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="file:cvs/user.csv" />

<property name="lineMapper">
    <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
      <!-- split it -->
      <property name="lineTokenizer">
            <bean
          class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
            <property name="names" value="userid,customerId,ssoId,flag1,flag2" />
        </bean>
      </property>
      <property name="fieldSetMapper">   

          <!-- map to an object -->
          <bean
            class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
            <property name="prototypeBeanName" value="user" />
          </bean>           
      </property>

      </bean>
  </property>

       </bean>

      <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
 <property name="concurrencyLimit" value="4"/>   

我尝试过不同类型的任务执行器,但它们的行为方式都是一样的。如何将每一行分配给单独的线程?


共3个答案

匿名用户

FlatFileItemReader不是线程安全的。在您的示例中,您可以尝试将CSV文件拆分为较小的CSV文件,然后使用MultiResourcePartitioner来处理其中的每一个文件。这可以通过两个步骤完成,一个用于拆分原始文件(如10个较小的文件),另一个用于处理拆分的文件。这样,您就不会有任何问题,因为每个文件都将由一个线程处理。

例子:

<batch:job id="csvsplitandprocess">
     <batch:step id="step1" next="step2master">
    <batch:tasklet>
        <batch:chunk reader="largecsvreader" writer="csvwriter" commit-interval="500">
        </batch:chunk>
    </batch:tasklet>
    </batch:step>
    <batch:step id="step2master">
    <partition step="step2" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</batch:step>
</batch:job>

<batch:step id="step2">
    <batch:tasklet>
        <batch:chunk reader="smallcsvreader" writer="writer" commit-interval="100">
        </batch:chunk>
    </batch:tasklet>
</batch:step>


<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="10" />
            <property name="maxPoolSize" value="10" />
    </bean>

<bean id="partitioner" 
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
<property name="resources" value="file:cvs/extracted/*.csv" />
</bean>

替代分区的方法可能是定制线程安全读取器,它将为每一行创建一个线程,但分区可能是您的最佳选择

匿名用户

你的问题是你的读者不在范围步骤中。

这意味着:所有线程共享相同的输入流(资源文件)。

要为每个线程处理一行,您需要:

  1. 确保所有线程都从文件的开头到结尾读取文件(每个线程都应该为每个执行上下文打开流并关闭它)
  2. 分区程序必须为每个执行上下文注入开始和结束位置。
  3. 您的读者必须阅读具有此位置的文件。

我编写了一些代码,这是输出:

com的代码。测验分区器。RangePartitioner类:

public Map<String, ExecutionContext> partition() {

    Map < String, ExecutionContext > result = new HashMap < String, ExecutionContext >();

    int range = 1;
    int fromId = 1;
    int toId = range;

    for (int i = 1; i <= gridSize; i++) {
        ExecutionContext value = new ExecutionContext();

        log.debug("\nStarting : Thread" + i);
        log.debug("fromId : " + fromId);
        log.debug("toId : " + toId);

        value.putInt("fromId", fromId);
        value.putInt("toId", toId);

        // give each thread a name, thread 1,2,3
        value.putString("name", "Thread" + i);

        result.put("partition" + i, value);

        fromId = toId + 1;
        toId += range;

    }

    return result;
}

--

开始:线程1 fromId: 1 toId: 1

开始:线程2 fromId: 2 toId: 2

起始:Thread3从ID:3到ID:3

开始:线程4 fromId: 4 toId: 4

起始:Thread5 fromId:5 toId:5

起始:Thread6从ID:6到ID:6

开始:Thread7 fromId: 7 toId: 7

起始:Thread8 fromId:8 toId:8

起始:Thread9 fromId:9 toId:9

开始:线程10 fromId: 10 toId: 10

看看下面的配置:

http://www.springframework.org/schema/batch/spring-batch-2.2.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.2.xsd"

<import resource="../config/context.xml" />
<import resource="../config/database.xml" />

<bean id="mouvement" class="com.test.model.Mouvement" scope="prototype" />

<bean id="itemProcessor" class="com.test.processor.CustomItemProcessor" scope="step">
    <property name="threadName" value="#{stepExecutionContext[name]}" />
</bean>
<bean id="xmlItemWriter" class="com.test.writer.ItemWriter" />

<batch:job id="mouvementImport" xmlns:batch="http://www.springframework.org/schema/batch">
    <batch:listeners>
        <batch:listener ref="myAppJobExecutionListener" />
    </batch:listeners>

    <batch:step id="masterStep">
        <batch:partition step="slave" partitioner="rangePartitioner">
            <batch:handler grid-size="10" task-executor="taskExecutor" />
        </batch:partition>
    </batch:step>
</batch:job>

<bean id="rangePartitioner" class="com.test.partitioner.RangePartitioner" />

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

<batch:step id="slave">
    <batch:tasklet>
        <batch:listeners>
            <batch:listener ref="stepExecutionListener" />
        </batch:listeners>

        <batch:chunk reader="mouvementReader" writer="xmlItemWriter" processor="itemProcessor" commit-interval="1">
        </batch:chunk>

    </batch:tasklet>
</batch:step>



<bean id="stepExecutionListener" class="com.test.listener.step.StepExecutionListenerCtxInjecter" scope="step" />

<bean id="myAppJobExecutionListener" class="com.test.listener.job.MyAppJobExecutionListener" />

<bean id="mouvementReaderParent" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">

    <property name="resource" value="classpath:XXXXX/XXXXXXXX.csv" />

    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="delimiter" value="|" />
                    <property name="names"
                        value="id,numen,prenom,grade,anneeScolaire,academieOrigin,academieArrivee,codeUsi,specialiteEmploiType,natureSupport,dateEffet,modaliteAffectation" />
                </bean>
            </property>
            <property name="fieldSetMapper">
                <bean class="com.test.mapper.MouvementFieldSetMapper" />
            </property>
        </bean>
    </property>

</bean>

<!--    <bean id="itemReader" scope="step" autowire-candidate="false" parent="mouvementReaderParent">-->
<!--        <property name="resource" value="#{stepExecutionContext[fileName]}" />-->
<!--    </bean>-->

<bean id="mouvementReader" class="com.test.reader.MouvementItemReader" scope="step">
    <property name="delegate" ref="mouvementReaderParent" />
    <property name="parameterValues">
        <map>
            <entry key="fromId" value="#{stepExecutionContext[fromId]}" />
            <entry key="toId" value="#{stepExecutionContext[toId]}" />
        </map>
    </property>
</bean>

<!--    <bean id="xmlItemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">-->
<!--        <property name="resource" value="file:xml/outputs/Mouvements.xml" />-->
<!--        <property name="marshaller" ref="reportMarshaller" />-->
<!--        <property name="rootTagName" value="Mouvement" />-->
<!--    </bean>-->

<bean id="reportMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
    <property name="classesToBeBound">
        <list>
            <value>com.test.model.Mouvement</value>
        </list>
    </property>
</bean>

待办事项:更改其他读取位置(开始和结束位置)的阅读器,如java中的扫描仪类。

希望这有帮助。

匿名用户

您可以将输入文件拆分为多个文件,使用Partitionner并使用线程加载小文件,但如果出现错误,必须在清理数据库后重新启动所有作业。

<batch:job id="transformJob">
<batch:step id="deleteDir" next="cleanDB">
    <batch:tasklet ref="fileDeletingTasklet" />
</batch:step>
<batch:step id="cleanDB" next="split">
    <batch:tasklet ref="countThreadTasklet" />
</batch:step>
<batch:step id="split" next="partitionerMasterImporter">
    <batch:tasklet>
        <batch:chunk reader="largeCSVReader" writer="smallCSVWriter" commit-interval="#{jobExecutionContext['chunk.count']}" />
    </batch:tasklet>
</batch:step>
<batch:step id="partitionerMasterImporter" next="partitionerMasterExporter">
    <partition step="importChunked" partitioner="filePartitioner">
        <handler grid-size="10" task-executor="taskExecutor" />
    </partition>
</batch:step>

完整的示例代码工作(在Github上)

希望这有帮助。