如何使用Apache Storm Tuple
问题内容:
我刚开始使用Apache
Storm。我阅读了本教程,并浏览了示例。我的问题是所有示例都使用非常简单的元组(通常是一个字符串)。元组是内联创建的(使用新的Values(…))。就我而言,我的元组有很多字段(5..100)。所以我的问题是如何为每个字段实现具有名称和类型(所有原始类型)的元组?
有没有例子?(我认为直接实现“元组”不是一个好主意)
谢谢
问题答案:
使用所有字段作为值创建元组的另一种方法是只创建一个bean并将其传递给元组。
给定以下类别:
public class DataBean implements Serializable {
private static final long serialVersionUID = 1L;
// add more properties as necessary
int id;
String word;
public DataBean(int id, String word) {
setId(id);
setWord(word);
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
一键创建并发出DataBean:
collector.emit(new Values(bean));
在目标螺栓中获取DataBean:
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
DataBean bean = (DataBean)tuple.getValue(0);
// do your bolt processing with the bean
} catch (Exception e) {
LOG.error("WordCountBolt error", e);
collector.reportError(e);
}
}
设置拓扑时,请不要忘记使bean可序列化并注册:
Config stormConfig = new Config();
stormConfig.registerSerialization(DataBean.class);
// more stuff
StormSubmitter.submitTopology("MyTopologyName", stormConfig, builder.createTopology());
免责声明:Beans可以很好地用于随机分组。如果需要执行fieldsGrouping
,则仍应使用原语。例如,在“字数统计”方案中,您需要按单词分组,以便发出:
collector.emit(new Values(word, bean));