Skip to content

Commit b809df5

Browse files
committed
Default to BYTES schema if JSON cannot be constructed for the message type
1 parent fb83a7b commit b809df5

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.pulsar.common.schema.SchemaType;
4040

4141
import org.springframework.core.ResolvableType;
42+
import org.springframework.core.log.LogAccessor;
4243
import org.springframework.lang.Nullable;
4344

4445
import com.google.protobuf.GeneratedMessageV3;
@@ -56,6 +57,8 @@
5657
*/
5758
public class DefaultSchemaResolver implements SchemaResolver {
5859

60+
private final LogAccessor logger = new LogAccessor(this.getClass());
61+
5962
private static final Map<Class<?>, Schema<?>> BASE_SCHEMA_MAPPINGS = new HashMap<>();
6063
static {
6164
BASE_SCHEMA_MAPPINGS.put(byte[].class, Schema.BYTES);
@@ -129,10 +132,20 @@ public <T> Schema<T> getSchema(Class<?> messageClass, boolean returnDefault) {
129132
}
130133

131134
@Nullable
132-
private Schema<?> getCustomSchemaOrMaybeDefault(Class<?> messageClass, boolean returnDefault) {
135+
protected Schema<?> getCustomSchemaOrMaybeDefault(Class<?> messageClass, boolean returnDefault) {
133136
Schema<?> schema = this.customSchemaMappings.get(messageClass);
134137
if (schema == null && returnDefault) {
135-
return messageClass != null ? Schema.JSON(messageClass) : Schema.BYTES;
138+
if (messageClass != null) {
139+
try {
140+
return Schema.JSON(messageClass);
141+
}
142+
catch (Exception e) {
143+
if (logger.isDebugEnabled()) {
144+
logger.debug(e, "Failed to create JSON schema for " + messageClass.getName());
145+
}
146+
}
147+
}
148+
return Schema.BYTES;
136149
}
137150
return schema;
138151
}

0 commit comments

Comments
 (0)