मुझे निम्न त्रुटि का सामना करना पड़ रहा है। संगम मंच में neo4j सिंक कनेक्टर जोड़ते समय।
इसके कारण: org.apache.kafka.connect.errors.DataException: JsonConverter के साथ schemas.enable को "स्कीमा" और "पेलोड" फ़ील्ड की आवश्यकता होती है और इसमें अतिरिक्त फ़ील्ड नहीं हो सकते हैं। यदि आप सादे JSON डेटा को डीसेरिएलाइज़ करने का प्रयास कर रहे हैं, तो अपने कनवर्टर कॉन्फ़िगरेशन में schemas.enable=false सेट करें। org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359) पर
कृपया नीचे दिए गए कॉन्फ़िगरेशन भाग की जाँच करें:
curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-book/config -H "Content-Type: application/json" -d '{
"topics": "pls.bookneo",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "*****"
}'
इस प्रकार अद्यतन परिवर्तन: कर्ल-एक्स पुट http://localhost:8083/connectors/Neo4j-Sink- कनेक्ट-बुक/कॉन्फ़िगरेशन -H "Content-Type: application/json" -d '{
"topics": "pulseTest.bookneo",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"true",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pulse",
"neo4j.encryption.enabled": false,
"neo4j.topic.cdc.schema": "pulseTest.bookneo"
}'
अब इस मुद्दे का सामना करना पड़ रहा है:
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pulseTest.bookneo to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
अब, मेरा कनेक्टर ठीक से चल रहा है लेकिन neo4j में नोड खाली बना दिया गया है।
कर्ल-एक्स पुट http://localhost:8083/connectors/Neo4j-Sink- Connect-projectp/config -H "Content-Type: application/json" -d '{
"topics": "projectsp",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"errors.tolerance": "all",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pulse",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.projectsp": " MERGE (p:projects{projectname:coalesce(event.projectname,0),count:coalesce(event.count,0)}) "
}'
यह मेरी साइबर क्वेरी है: "मर्ज (पी: प्रोजेक्ट्स {प्रोजेक्टनाम: कोलेस (इवेंट। प्रोजेक्टनाम, 0)), गिनती: कोलेस (इवेंट। गिनती, 0)})"
मैंने विषय से आने वाले डेटा को संलग्न किया है
2 जवाब
आप अपने key
में भी JsonConverter का उपयोग कर रहे हैं, इसलिए वहां भी schemas.enable
निर्दिष्ट करने की आवश्यकता है:
curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-book/config -H "Content-Type: application/json" -d '{
"topics": "pls.bookneo",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "*****"
}'
कन्वर्टर्स के बारे में अधिक जानने के लिए देखें https://www .confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
अज्ञात जादू बाइट!
आपके विषय में कंफ्लुएंट क्लाइंट द्वारा निर्मित एवरो डेटा नहीं है, इसलिए डिसेरिएलाइज़र विफल हो जाता है
आप काफ्का एवरो कंसोल उपभोक्ता का उपयोग करके उसी चीज़ को सत्यापित कर सकते हैं
संबंधित सवाल
नए सवाल
neo4j
Neo4j एक ओपन-सोर्स ग्राफ़ डेटाबेस (GDB) है जो कनेक्टेड डेटा के अनुकूल है। प्रश्न पूछते समय कृपया Neo4j के अपने सटीक संस्करण का उल्लेख करें। आप इसका उपयोग अनुशंसा इंजन, धोखाधड़ी का पता लगाने, ग्राफ़-आधारित खोज, नेटवर्क ऑप्स / सुरक्षा और कई अन्य उपयोगकर्ता मामलों के लिए कर सकते हैं। डेटाबेस जावा, जावास्क्रिप्ट, पायथन और .NET में आधिकारिक ड्राइवरों, या पीएचपी, रूबी, आर, गोलांग, एलिक्सिर, स्विफ्ट और अधिक में सामुदायिक-योगदान वाले ड्राइवरों के माध्यम से पहुँचा है।