Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
S
service-campaign
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nguyen Ha
service-campaign
Commits
433be62c
Commit
433be62c
authored
Aug 14, 2019
by
Vu Duy Anh
Browse files
Options
Browse Files
Download
Plain Diff
anhvd commit merge code
parents
40f1295f
1799eb5e
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
254 additions
and
252 deletions
+254
-252
src/main/java/com/viettel/campaign/config/ReceiverConfig.java
...main/java/com/viettel/campaign/config/ReceiverConfig.java
+54
-54
src/main/java/com/viettel/campaign/config/SenderConfig.java
src/main/java/com/viettel/campaign/config/SenderConfig.java
+48
-48
src/main/java/com/viettel/campaign/filter/CorsFilter.java
src/main/java/com/viettel/campaign/filter/CorsFilter.java
+22
-20
src/main/java/com/viettel/campaign/service/Consumer.java
src/main/java/com/viettel/campaign/service/Consumer.java
+17
-17
src/main/java/com/viettel/campaign/service/Producer.java
src/main/java/com/viettel/campaign/service/Producer.java
+30
-30
src/main/java/com/viettel/campaign/service/Receiver.java
src/main/java/com/viettel/campaign/service/Receiver.java
+19
-19
src/main/java/com/viettel/campaign/service/Sender.java
src/main/java/com/viettel/campaign/service/Sender.java
+18
-18
src/main/java/com/viettel/campaign/utils/Config.java
src/main/java/com/viettel/campaign/utils/Config.java
+2
-2
src/main/java/com/viettel/campaign/web/rest/KafkaController.java
...n/java/com/viettel/campaign/web/rest/KafkaController.java
+44
-44
No files found.
src/main/java/com/viettel/campaign/config/ReceiverConfig.java
View file @
433be62c
package
com.viettel.campaign.config
;
import
com.viettel.campaign.service.Receiver
;
import
org.springframework.context.annotation.Configuration
;
import
java.util.HashMap
;
import
java.util.Map
;
import
org.apache.kafka.clients.consumer.ConsumerConfig
;
import
org.apache.kafka.common.serialization.StringDeserializer
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
;
import
org.springframework.kafka.core.ConsumerFactory
;
import
org.springframework.kafka.core.DefaultKafkaConsumerFactory
;
import
org.springframework.kafka.support.serializer.JsonDeserializer
;
/**
* @author hanv_itsol
* @project campaign
*/
@Configuration
public
class
ReceiverConfig
{
// @Value("${spring.kafka.bootstrap-servers}")
private
String
bootstrapServers
=
"192.168.1.201:9092"
;
@Bean
public
Map
<
String
,
Object
>
consumerConfigs
()
{
Map
<
String
,
Object
>
props
=
new
HashMap
<>();
props
.
put
(
ConsumerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
bootstrapServers
);
props
.
put
(
ConsumerConfig
.
KEY_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
);
props
.
put
(
ConsumerConfig
.
VALUE_DESERIALIZER_CLASS_CONFIG
,
JsonDeserializer
.
class
);
props
.
put
(
ConsumerConfig
.
GROUP_ID_CONFIG
,
"hanv"
);
return
props
;
}
@Bean
public
ConsumerFactory
<
String
,
String
>
consumerFactory
()
{
return
new
DefaultKafkaConsumerFactory
<>(
consumerConfigs
(),
new
StringDeserializer
(),
new
JsonDeserializer
<>());
}
@Bean
public
ConcurrentKafkaListenerContainerFactory
<
String
,
String
>
kafkaListenerContainerFactory
()
{
ConcurrentKafkaListenerContainerFactory
<
String
,
String
>
factory
=
new
ConcurrentKafkaListenerContainerFactory
<>();
factory
.
setConsumerFactory
(
consumerFactory
());
return
factory
;
}
@Bean
public
Receiver
receiver
()
{
return
new
Receiver
();
}
}
//
package com.viettel.campaign.config;
//
//
import com.viettel.campaign.service.Receiver;
//
import org.springframework.context.annotation.Configuration;
//
import java.util.HashMap;
//
import java.util.Map;
//
//
import org.apache.kafka.clients.consumer.ConsumerConfig;
//
import org.apache.kafka.common.serialization.StringDeserializer;
//
import org.springframework.context.annotation.Bean;
//
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
//
import org.springframework.kafka.core.ConsumerFactory;
//
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
//
import org.springframework.kafka.support.serializer.JsonDeserializer;
//
/
//
**
//
* @author hanv_itsol
//
* @project campaign
//
*/
//
@Configuration
//
public class ReceiverConfig {
//
//
@Value("${spring.kafka.bootstrap-servers}")
//
private String bootstrapServers = "192.168.1.201:9092";
//
//
@Bean
//
public Map<String, Object> consumerConfigs() {
//
Map<String, Object> props = new HashMap<>();
//
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
//
props.put(ConsumerConfig.GROUP_ID_CONFIG, "hanv");
//
return props;
//
}
//
//
@Bean
//
public ConsumerFactory<String, String> consumerFactory() {
//
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
//
new JsonDeserializer<>());
//
}
//
//
@Bean
//
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
//
ConcurrentKafkaListenerContainerFactory<String, String> factory =
//
new ConcurrentKafkaListenerContainerFactory<>();
//
factory.setConsumerFactory(consumerFactory());
//
//
return factory;
//
}
//
//
@Bean
//
public Receiver receiver() {
//
return new Receiver();
//
}
//
}
src/main/java/com/viettel/campaign/config/SenderConfig.java
View file @
433be62c
package
com.viettel.campaign.config
;
import
java.util.HashMap
;
import
java.util.Map
;
import
com.viettel.campaign.service.Sender
;
import
org.apache.kafka.clients.producer.ProducerConfig
;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.kafka.core.DefaultKafkaProducerFactory
;
import
org.springframework.kafka.core.KafkaTemplate
;
import
org.springframework.kafka.core.ProducerFactory
;
import
org.springframework.kafka.support.serializer.JsonSerializer
;
/**
* @author hanv_itsol
* @project campaign
*/
@Configuration
public
class
SenderConfig
{
private
String
bootstrapServers
=
"192.168.1.201:9092"
;
@Bean
public
Map
<
String
,
Object
>
producerConfigs
()
{
Map
<
String
,
Object
>
props
=
new
HashMap
<>();
props
.
put
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
bootstrapServers
);
props
.
put
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
);
props
.
put
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
JsonSerializer
.
class
);
return
props
;
}
@Bean
public
ProducerFactory
<
String
,
String
>
producerFactory
()
{
return
new
DefaultKafkaProducerFactory
<>(
producerConfigs
());
}
@Bean
public
KafkaTemplate
<
String
,
String
>
simpleKafkaTemplate
()
{
return
new
KafkaTemplate
<>(
producerFactory
());
}
@Bean
public
Sender
sender
()
{
return
new
Sender
();
}
}
//
package com.viettel.campaign.config;
//
//
import java.util.HashMap;
//
import java.util.Map;
//
//
import com.viettel.campaign.service.Sender;
//
import org.apache.kafka.clients.producer.ProducerConfig;
//
import org.apache.kafka.common.serialization.StringSerializer;
//
import org.springframework.context.annotation.Bean;
//
import org.springframework.context.annotation.Configuration;
//
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
//
import org.springframework.kafka.core.KafkaTemplate;
//
import org.springframework.kafka.core.ProducerFactory;
//
import org.springframework.kafka.support.serializer.JsonSerializer;
//
/
//
**
//
* @author hanv_itsol
//
* @project campaign
//
*/
//
@Configuration
//
public class SenderConfig {
//
private String bootstrapServers = "192.168.1.201:9092";
//
//
@Bean
//
public Map<String, Object> producerConfigs() {
//
Map<String, Object> props = new HashMap<>();
//
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
//
//
return props;
//
}
//
//
@Bean
//
public ProducerFactory<String, String> producerFactory() {
//
return new DefaultKafkaProducerFactory<>(producerConfigs());
//
}
//
//
@Bean
//
public KafkaTemplate<String, String> simpleKafkaTemplate() {
//
return new KafkaTemplate<>(producerFactory());
//
}
//
//
@Bean
//
public Sender sender() {
//
return new Sender();
//
}
//
}
src/main/java/com/viettel/campaign/filter/CorsFilter.java
View file @
433be62c
...
...
@@ -30,26 +30,28 @@ public class CorsFilter implements Filter {
HttpServletRequest
request
=
(
HttpServletRequest
)
req
;
if
(
"OPTIONS"
.
equalsIgnoreCase
(
request
.
getMethod
()))
{
chain
.
doFilter
(
req
,
resp
);
return
;
}
logger
.
info
(
"uri: "
+
request
.
getRequestURI
());
if
(
"/"
.
equals
(
request
.
getRequestURI
()))
{
chain
.
doFilter
(
req
,
resp
);
return
;
}
String
xAuthToken
=
request
.
getHeader
(
"X-Auth-Token"
);
if
(
xAuthToken
==
null
||
""
.
equals
(
xAuthToken
))
{
response
.
sendError
(
HttpServletResponse
.
SC_UNAUTHORIZED
,
"The token is null."
);
return
;
}
Object
obj
=
RedisUtil
.
getInstance
().
get
(
xAuthToken
);
if
(
obj
instanceof
UserSession
)
{
chain
.
doFilter
(
req
,
resp
);
}
else
{
response
.
sendError
(
HttpServletResponse
.
SC_UNAUTHORIZED
,
"The token is invalid."
);
}
chain
.
doFilter
(
req
,
resp
);
// if ("OPTIONS".equalsIgnoreCase(request.getMethod())) {
// chain.doFilter(req, resp);
// return;
// }
// logger.info("uri: "+ request.getRequestURI());
// if ("/".equals(request.getRequestURI())) {
// chain.doFilter(req, resp);
// return;
// }
// String xAuthToken = request.getHeader("X-Auth-Token");
// if (xAuthToken == null || "".equals(xAuthToken)) {
// response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "The token is null.");
// return;
// }
// Object obj = RedisUtil.getInstance().get(xAuthToken);
// if (obj instanceof UserSession) {
// chain.doFilter(req, resp);
// } else {
// response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "The token is invalid.");
// }
}
@Override
...
...
src/main/java/com/viettel/campaign/service/Consumer.java
View file @
433be62c
package
com.viettel.campaign.service
;
import
org.springframework.kafka.annotation.KafkaListener
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Service
;
/**
* @author hanv_itsol
* @project service-campaign
*/
@Service
public
class
Consumer
{
@KafkaListener
(
topics
=
"TestTopic"
,
groupId
=
"1001"
)
public
void
consume
(
String
message
){
System
.
out
.
println
(
"Consumed Message: "
+
message
);
}
}
//
package com.viettel.campaign.service;
//
//
import org.springframework.kafka.annotation.KafkaListener;
//
import org.springframework.stereotype.Component;
//
import org.springframework.stereotype.Service;
//
/
//
**
//
* @author hanv_itsol
//
* @project service-campaign
//
*/
//
@Service
//
public class Consumer {
//
@KafkaListener(topics = "TestTopic", groupId = "1001")
//
public void consume(String message){
//
System.out.println("Consumed Message: " + message);
//
}
//
}
src/main/java/com/viettel/campaign/service/Producer.java
View file @
433be62c
package
com.viettel.campaign.service
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.kafka.core.KafkaTemplate
;
import
org.springframework.stereotype.Service
;
/**
* @author hanv_itsol
* @project service-campaign
*/
@Service
public
class
Producer
{
private
static
final
String
TOPIC
=
"TestTopic"
;
private
static
final
String
TOPIC2
=
"TestTopic2"
;
@Autowired
private
KafkaTemplate
<
String
,
String
>
kafkaTemplate
;
public
void
sendMessage
(
String
message
){
this
.
kafkaTemplate
.
send
(
TOPIC
,
"key1"
,
message
);
}
public
void
sendMessageTopic2
(
String
message
){
this
.
kafkaTemplate
.
send
(
TOPIC2
,
"key2"
,
message
);
}
}
//
package com.viettel.campaign.service;
//
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.kafka.core.KafkaTemplate;
//
import org.springframework.stereotype.Service;
//
/
//
**
//
* @author hanv_itsol
//
* @project service-campaign
//
*/
//
//
@Service
//
public class Producer {
//
//
private static final String TOPIC = "TestTopic";
//
private static final String TOPIC2 = "TestTopic2";
//
//
//
@Autowired
//
private KafkaTemplate<String, String> kafkaTemplate;
//
//
public void sendMessage(String message){
//
this.kafkaTemplate.send(TOPIC, "key1", message);
//
}
//
//
public void sendMessageTopic2(String message){
//
this.kafkaTemplate.send(TOPIC2, "key2", message);
//
}
//
//
}
src/main/java/com/viettel/campaign/service/Receiver.java
View file @
433be62c
package
com.viettel.campaign.service
;
import
org.springframework.kafka.annotation.KafkaListener
;
import
java.util.concurrent.CountDownLatch
;
/**
* @author hanv_itsol
* @project campaign
*/
public
class
Receiver
{
private
CountDownLatch
latch
=
new
CountDownLatch
(
1
);
@KafkaListener
(
topics
=
"hanv"
)
public
void
receive
(
String
payload
)
{
latch
.
countDown
();
}
}
//
package com.viettel.campaign.service;
//
//
import org.springframework.kafka.annotation.KafkaListener;
//
//
import java.util.concurrent.CountDownLatch;
//
/
//
**
//
* @author hanv_itsol
//
* @project campaign
//
*/
//
public class Receiver {
//
//
private CountDownLatch latch = new CountDownLatch(1);
//
//
@KafkaListener(topics = "hanv")
//
public void receive(String payload) {
//
latch.countDown();
//
}
//
}
src/main/java/com/viettel/campaign/service/Sender.java
View file @
433be62c
package
com.viettel.campaign.service
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.kafka.core.KafkaTemplate
;
/**
* @author hanv_itsol
* @project campaign
*/
public
class
Sender
{
@Autowired
private
KafkaTemplate
<
String
,
String
>
simpleKafkaTemplate
;
public
void
send
(
String
topic
,
String
payload
)
{
simpleKafkaTemplate
.
send
(
topic
,
payload
);
}
}
//
package com.viettel.campaign.service;
//
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.kafka.core.KafkaTemplate;
//
/
//
**
//
* @author hanv_itsol
//
* @project campaign
//
*/
//
public class Sender {
//
//
@Autowired
//
private KafkaTemplate<String, String> simpleKafkaTemplate;
//
//
public void send(String topic, String payload) {
//
simpleKafkaTemplate.send(topic, payload);
//
}
//
}
src/main/java/com/viettel/campaign/utils/Config.java
View file @
433be62c
package
com.viettel.campaign.utils
;
import
com.viettel.security.PassTranformer
;
//
import com.viettel.security.PassTranformer;
import
org.apache.log4j.Logger
;
import
java.io.File
;
...
...
@@ -60,7 +60,7 @@ public class Config {
}
catch
(
IOException
ex
)
{
Logger
.
getLogger
(
Config
.
class
.
getName
()).
error
(
ex
.
getMessage
(),
ex
);
}
PassTranformer
.
setInputKey
(
"Ipcc#987654321#@!"
);
//
PassTranformer.setInputKey("Ipcc#987654321#@!");
// rabbitConnection = properties.getProperty("rabbit_connection_string");
// fbGatewayUser = PassTranformer.decrypt(properties.getProperty("rabbit_user", "").trim());
...
...
src/main/java/com/viettel/campaign/web/rest/KafkaController.java
View file @
433be62c
package
com.viettel.campaign.web.rest
;
import
com.viettel.campaign.service.Sender
;
import
com.viettel.campaign.service.Producer
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.web.bind.annotation.*
;
/**
* @author hanv_itsol
* @project service-campaign
*/
@Slf4j
@RestController
@RequestMapping
(
value
=
"/kafka"
)
public
class
KafkaController
{
private
final
Sender
sender
;
private
final
Producer
producer
;
@Autowired
KafkaController
(
Sender
sender
,
Producer
producer
)
{
this
.
sender
=
sender
;
this
.
producer
=
producer
;
}
@PostMapping
(
value
=
"/publish"
)
public
void
sendMessageToKafkaTopic
(
@RequestParam
(
"message"
)
String
message
)
{
log
.
info
(
"message: "
+
message
);
this
.
producer
.
sendMessage
(
message
);
}
@PostMapping
(
value
=
"/publish2"
)
public
void
sendMessageToKafkaTopic2
(
@RequestParam
(
"message"
)
String
message
)
{
log
.
info
(
"message: "
+
message
);
this
.
producer
.
sendMessageTopic2
(
message
);
}
@GetMapping
(
value
=
"/test"
)
public
void
test
(){
sender
.
send
(
"hanv"
,
"haha"
);
}
}
//
package com.viettel.campaign.web.rest;
//
//
import com.viettel.campaign.service.Sender;
//
import com.viettel.campaign.service.Producer;
//
import lombok.extern.slf4j.Slf4j;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.web.bind.annotation.*;
//
/
//
**
//
* @author hanv_itsol
//
* @project service-campaign
//
*/
//
@Slf4j
//
@RestController
//
@RequestMapping(value = "/kafka")
//
public class KafkaController {
//
//
private final Sender sender;
//
//
private final Producer producer;
//
//
@Autowired
//
KafkaController(Sender sender, Producer producer) {
//
this.sender = sender;
//
this.producer = producer;
//
}
//
//
@PostMapping(value = "/publish")
//
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
//
log.info("message: " + message);
//
this.producer.sendMessage(message);
//
}
//
//
@PostMapping(value = "/publish2")
//
public void sendMessageToKafkaTopic2(@RequestParam("message") String message) {
//
log.info("message: " + message);
//
this.producer.sendMessageTopic2(message);
//
}
//
//
@GetMapping(value = "/test")
//
public void test(){
//
sender.send("hanv", "haha");
//
}
//
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment