如何使用Apache Storm Tuple


问题内容

我刚开始使用Apache
Storm。我阅读了本教程,并浏览了示例。我的问题是所有示例都使用非常简单的元组(通常是一个字符串)。元组是内联创建的(使用新的Values(…))。就我而言,我的元组有很多字段(5..100)。所以我的问题是如何为每个字段实现具有名称和类型(所有原始类型)的元组?

有没有例子?(我认为直接实现“元组”不是一个好主意)

谢谢


问题答案:

使用所有字段作为值创建元组的另一种方法是只创建一个bean并将其传递给元组。

给定以下类别:

public class DataBean implements Serializable {
    private static final long serialVersionUID = 1L;

    // add more properties as necessary
    int id;
    String word;

    public DataBean(int id, String word) {
        setId(id);
        setWord(word);
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getWord() {
        return word;
    }
    public void setWord(String word) {
        this.word = word;
    }
}

一键创建并发出DataBean:

collector.emit(new Values(bean));

在目标螺栓中获取DataBean:

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
    try {
        DataBean bean = (DataBean)tuple.getValue(0);
        // do your bolt processing with the bean
    } catch (Exception e) {
        LOG.error("WordCountBolt error", e);
        collector.reportError(e);
    }       
}

设置拓扑时,请不要忘记使bean可序列化并注册:

Config stormConfig = new Config();
stormConfig.registerSerialization(DataBean.class);
// more stuff
StormSubmitter.submitTopology("MyTopologyName", stormConfig, builder.createTopology());

免责声明:Beans可以很好地用于随机分组。如果需要执行fieldsGrouping,则仍应使用原语。例如,在“字数统计”方案中,您需要按单词分组,以便发出:

collector.emit(new Values(word, bean));