【以下的问题经过翻译处理】 我应该如何使用 KCL 处理 Java 中的实际记录?
我正在按照提供的指导 https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html,我可以连接到数据流,我可以得到可用记录数,但是该示例未显示的是如何实际获取记录(Json 字符串)。从示例中我可以看到我可以使用 r.data()
来获取记录的数据,它作为只读的 ByteBuffer
出现,我可以使用 StandardCharsets.US_ASCII.decode(r. data()).toString()
,但是生成的字符串肯定是经过编码的,我尝试进行 Base64 解码,但出现错误 java.lang.IllegalArgumentException: Illegal base64 character 3f
。那么获取有效载荷的最简单方法是什么?下面是我的 processRecords
方法:
public void processRecords(ProcessRecordsInput processRecordsInput) {
try {
System.out.println("Processing " + processRecordsInput.records().size() + " record(s)");
processRecordsInput.records().forEach((r) -> {
try {
Decoder dec = Base64.getDecoder();
String myString = StandardCharsets.US_ASCII.decode(r.data()).toString();
byte[] bt = dec.decode(myString);
} catch (Exception e) {
e.printStackTrace();
}
});
} catch (Throwable t) {
System.out.println("Caught throwable while processing records. Aborting.");
Runtime.getRuntime().halt(1);
} finally {
}
}
从这里我可以得到“myString”,但是当我到达“bt”时,我得到了显示的异常。我还没有找到解释如何获取记录的单一资源。我使用aws kinesis put-record --stream-name testStream --partition-key 1 --data {"somedata":"This Data"}
将记录发布到kinesis