使用KCL和Java协助处理Kinesis Records的帮助

0

【以下的问题经过翻译处理】 我应该如何使用 KCL 处理 Java 中的实际记录?

我正在按照提供的指导 https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html,我可以连接到数据流,我可以得到可用记录数,但是该示例未显示的是如何实际获取记录(Json 字符串)。从示例中我可以看到我可以使用 r.data() 来获取记录的数据,它作为只读的 ByteBuffer 出现,我可以使用 StandardCharsets.US_ASCII.decode(r. data()).toString(),但是生成的字符串肯定是经过编码的,我尝试进行 Base64 解码,但出现错误 java.lang.IllegalArgumentException: Illegal base64 character 3f。那么获取有效载荷的最简单方法是什么?下面是我的 processRecords 方法:

public void processRecords(ProcessRecordsInput processRecordsInput) {
    try {
        System.out.println("Processing " + processRecordsInput.records().size() + " record(s)");
        processRecordsInput.records().forEach((r) -> {
            try {
                Decoder dec = Base64.getDecoder();
                String myString = StandardCharsets.US_ASCII.decode(r.data()).toString();
                byte[] bt = dec.decode(myString);
 
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    } catch (Throwable t) {
        System.out.println("Caught throwable while processing records. Aborting.");
        Runtime.getRuntime().halt(1);
    } finally {
    }
}

从这里我可以得到“myString”,但是当我到达“bt”时,我得到了显示的异常。我还没有找到解释如何获取记录的单一资源。我使用aws kinesis put-record --stream-name testStream --partition-key 1 --data {"somedata":"This Data"}将记录发布到kinesis

profile picture
EXPERTE
gefragt vor 6 Monaten68 Aufrufe
1 Antwort
0

【以下的回答经过翻译处理】 问题出在生产者没有生成正确的数据,使用String myString = StandardCharsets.US_ASCII.decode(r.data()).toString();可以得到预期的字符串。

profile picture
EXPERTE
beantwortet vor 6 Monaten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen