मुझे निम्न त्रुटि का सामना करना पड़ रहा है। संगम मंच में neo4j सिंक कनेक्टर जोड़ते समय।

इसके कारण: org.apache.kafka.connect.errors.DataException: JsonConverter के साथ schemas.enable को "स्कीमा" और "पेलोड" फ़ील्ड की आवश्यकता होती है और इसमें अतिरिक्त फ़ील्ड नहीं हो सकते हैं। यदि आप सादे JSON डेटा को डीसेरिएलाइज़ करने का प्रयास कर रहे हैं, तो अपने कनवर्टर कॉन्फ़िगरेशन में schemas.enable=false सेट करें। org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359) पर

कृपया नीचे दिए गए कॉन्फ़िगरेशन भाग की जाँच करें:

curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-book/config -H "Content-Type: application/json" -d '{

    "topics": "pls.bookneo",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "neo4j.server.uri": "bolt://localhost:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "*****"

}'

इस प्रकार अद्यतन परिवर्तन: कर्ल-एक्स पुट http://localhost:8083/connectors/Neo4j-Sink- कनेक्ट-बुक/कॉन्फ़िगरेशन -H "Content-Type: application/json" -d '{

"topics": "pulseTest.bookneo",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter.schema.registry.url": "http://localhost:8081", 
"value.converter.schema.registry.url": "http://localhost:8081", 
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"true",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pulse",
"neo4j.encryption.enabled": false,
"neo4j.topic.cdc.schema": "pulseTest.bookneo"

}'

अब इस मुद्दे का सामना करना पड़ रहा है:

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pulseTest.bookneo to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

अब, मेरा कनेक्टर ठीक से चल रहा है लेकिन neo4j में नोड खाली बना दिया गया है।

कर्ल-एक्स पुट http://localhost:8083/connectors/Neo4j-Sink- Connect-projectp/config -H "Content-Type: application/json" -d '{

"topics": "projectsp",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"errors.tolerance": "all",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pulse",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.projectsp": " MERGE (p:projects{projectname:coalesce(event.projectname,0),count:coalesce(event.count,0)})  "

}'

यह मेरी साइबर क्वेरी है: "मर्ज (पी: प्रोजेक्ट्स {प्रोजेक्टनाम: कोलेस (इवेंट। प्रोजेक्टनाम, 0)), गिनती: कोलेस (इवेंट। गिनती, 0)})"

मैंने विषय से आने वाले डेटा को संलग्न किया है

0
user3694114 18 फरवरी 2020, 11:49
आपकी त्रुटि neo4j के लिए विशिष्ट नहीं है। इसके लिए आपने क्या शोध किया?
 – 
OneCricketeer
18 फरवरी 2020, 13:20
मैं neo4j सिंक कनेक्टर्स के लिए कॉन्फ्लुएंट पर कॉन्फ़िगरेशन जोड़ रहा हूं।
 – 
user3694114
27 फरवरी 2020, 12:51

2 जवाब

आप अपने key में भी JsonConverter का उपयोग कर रहे हैं, इसलिए वहां भी schemas.enable निर्दिष्ट करने की आवश्यकता है:

curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-book/config -H "Content-Type: application/json" -d '{
    "topics": "pls.bookneo",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "neo4j.server.uri": "bolt://localhost:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "*****"

}'

कन्वर्टर्स के बारे में अधिक जानने के लिए देखें https://www .confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

1
Robin Moffatt 18 फरवरी 2020, 12:55

अज्ञात जादू बाइट!

आपके विषय में कंफ्लुएंट क्लाइंट द्वारा निर्मित एवरो डेटा नहीं है, इसलिए डिसेरिएलाइज़र विफल हो जाता है

आप काफ्का एवरो कंसोल उपभोक्ता का उपयोग करके उसी चीज़ को सत्यापित कर सकते हैं

0
OneCricketeer 27 फरवरी 2020, 17:10