The proctime attribute ‘rowtime‘ must not replace an existing field.
故障代碼如下:
// *************************************************************************
// USER DATA TYPES
// *************************************************************************
/*
* Simple POJO.
*/
import java.sql.Timestamp;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* Simple POJO.
*/
public class OrderStream
{
public int id;
public Long user;
public String product;
public int amount;
public Long rowtime;
public OrderStream()
{
}
public OrderStream(int id,Long user,String product,int amount,Long rowtime)
{
this.id=id;
this.user = user;
this.product = product;
this.amount = amount;
this.rowtime = rowtime;
}
@Override
public String toString()
{
return "Order{" +
"id="+id+
", user=" + user +
", product='" + product + '\'' +
", amount=" + amount +
", ts=" + rowtime +
'}';
}
}
DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
new OrderStream(1, 1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
new OrderStream(2, 1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
new OrderStream(3, 3L, "rubber", 2, 1505527800L),//2017-09-16 10:10:00
new OrderStream(4, 3L, "rubber", 2, 1505527800L),//2017-09-16 10:10:00
new OrderStream(5, 1L, "diaper", 4, 1505528400L),//2017-09-16 10:20:00
new OrderStream(6, 1L, "diaper", 4, 1505528400L)//2017-09-16 10:20:00
));
tEnv.createTemporaryView("Orders", orderA,$("id"),$("user"), $("product"), $("amount"), $("rowtime").proctime());
完整報錯如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: The proctime attribute 'rowtime' must not replace an existing field.
at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.validateProctimeDoesNotReplaceField(FieldInfoUtils.java:606)
at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.createProctimeFieldInfo(FieldInfoUtils.java:599)
at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.visit(FieldInfoUtils.java:564)
at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.visit(FieldInfoUtils.java:536)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at org.apache.flink.table.typeutils.FieldInfoUtils.lambda$extractFieldInfosByNameReference$7(FieldInfoUtils.java:431)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfosByNameReference(FieldInfoUtils.java:432)
at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:266)
at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:233)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lambda$asQueryOperation$0(StreamTableEnvironmentImpl.java:384)
at java.util.Optional.map(Optional.java:215)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.java:383)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:230)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.createTemporaryView(StreamTableEnvironmentImpl.java:262)
at DistinctAggregation4.main(DistinctAggregation4.java:35)
The proctime attribute ‘rowtime’ must not replace an existing field.
原因與解決方案:
原因 | 解決方案 |
使用proctime,那麼列名不能是OrderStream中的 | tEnv.createTemporaryView(“Orders”, orderA,$(“id”),$(“user”), $(“product”), $(“amount”), $(“rowtime1”).proctime()); |
使用rowtime,,列名可以是OrderStream中的. |
|
还没有评论,来说两句吧...