The proctime attribute ‘rowtime‘ must not replace an existing field.

男娘i 2023-01-10 01:27 187阅读 0赞

故障代碼如下:

  1. // *************************************************************************
  2. // USER DATA TYPES
  3. // *************************************************************************
  4. /*
  5. * Simple POJO.
  6. */
  7. import java.sql.Timestamp;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. /**
  10. * Simple POJO.
  11. */
  12. public class OrderStream
  13. {
  14. public int id;
  15. public Long user;
  16. public String product;
  17. public int amount;
  18. public Long rowtime;
  19. public OrderStream()
  20. {
  21. }
  22. public OrderStream(int id,Long user,String product,int amount,Long rowtime)
  23. {
  24. this.id=id;
  25. this.user = user;
  26. this.product = product;
  27. this.amount = amount;
  28. this.rowtime = rowtime;
  29. }
  30. @Override
  31. public String toString()
  32. {
  33. return "Order{" +
  34. "id="+id+
  35. ", user=" + user +
  36. ", product='" + product + '\'' +
  37. ", amount=" + amount +
  38. ", ts=" + rowtime +
  39. '}';
  40. }
  41. }
  42. DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
  43. new OrderStream(1, 1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
  44. new OrderStream(2, 1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
  45. new OrderStream(3, 3L, "rubber", 2, 1505527800L),//2017-09-16 10:10:00
  46. new OrderStream(4, 3L, "rubber", 2, 1505527800L),//2017-09-16 10:10:00
  47. new OrderStream(5, 1L, "diaper", 4, 1505528400L),//2017-09-16 10:20:00
  48. new OrderStream(6, 1L, "diaper", 4, 1505528400L)//2017-09-16 10:20:00
  49. ));
  50. tEnv.createTemporaryView("Orders", orderA,$("id"),$("user"), $("product"), $("amount"), $("rowtime").proctime());

完整報錯如下:

  1. Exception in thread "main" org.apache.flink.table.api.ValidationException: The proctime attribute 'rowtime' must not replace an existing field.
  2. at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.validateProctimeDoesNotReplaceField(FieldInfoUtils.java:606)
  3. at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.createProctimeFieldInfo(FieldInfoUtils.java:599)
  4. at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.visit(FieldInfoUtils.java:564)
  5. at org.apache.flink.table.typeutils.FieldInfoUtils$ExprToFieldInfo.visit(FieldInfoUtils.java:536)
  6. at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
  7. at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
  8. at org.apache.flink.table.typeutils.FieldInfoUtils.lambda$extractFieldInfosByNameReference$7(FieldInfoUtils.java:431)
  9. at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
  10. at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
  11. at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
  12. at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
  13. at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
  14. at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
  15. at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
  16. at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfosByNameReference(FieldInfoUtils.java:432)
  17. at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:266)
  18. at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:233)
  19. at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lambda$asQueryOperation$0(StreamTableEnvironmentImpl.java:384)
  20. at java.util.Optional.map(Optional.java:215)
  21. at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.java:383)
  22. at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:230)
  23. at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.createTemporaryView(StreamTableEnvironmentImpl.java:262)
  24. at DistinctAggregation4.main(DistinctAggregation4.java:35)

The proctime attribute ‘rowtime’ must not replace an existing field.

原因與解決方案:
















原因 解決方案
使用proctime,那麼列名不能是OrderStream中的 tEnv.createTemporaryView(“Orders”, orderA,$(“id”),$(“user”), $(“product”), $(“amount”), $(“rowtime1”).proctime());
使用rowtime,,列名可以是OrderStream中的.
  1. tEnv.createTemporaryView(“Orders”, orderA,$(“id”),$(“user”), $(“product”), $(“amount”), $(“rowtime”).rowtime());

发表评论

表情:
评论列表 (有 0 条评论,187人围观)

还没有评论,来说两句吧...

相关阅读