Compare commits
1 Commits
main
...
feature/me
Author | SHA1 | Date | |
---|---|---|---|
dd56145418 |
56
.drone.yml
56
.drone.yml
@ -1,36 +1,30 @@
|
|||||||
type: docker
|
|
||||||
kind: pipeline
|
kind: pipeline
|
||||||
name: Serverctl
|
name: Serverctl
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: test
|
- name: terraform plan
|
||||||
image: harbor.front.kjuulh.io/docker-proxy/library/bash:latest
|
image: alpine
|
||||||
|
environment:
|
||||||
|
HCLOUD_TOKEN:
|
||||||
|
from_secret: serverctl_hcloud_token
|
||||||
|
ACCESS_KEY:
|
||||||
|
from_secret: serverctl_access_key
|
||||||
|
SECRET_KEY:
|
||||||
|
from_secret: serverctl_secret_key
|
||||||
|
SSH_ZIP_KEY:
|
||||||
|
from_secret: serverctl_ssh_zip_key
|
||||||
|
HCLOUD_SSH_KEY_ID:
|
||||||
|
from_secret: serverctl_hcloud_ssh_key_id
|
||||||
commands:
|
commands:
|
||||||
- echo 'Run tests'
|
- apk --update add curl zip ansible python3
|
||||||
#
|
- cd infrastructure && ./unzip-ssh-keys.sh "$SSH_ZIP_KEY" && cd ..
|
||||||
# - name: terraform plan
|
- curl --silent --output terraform.zip "https://releases.hashicorp.com/terraform/1.1.6/terraform_1.1.6_linux_amd64.zip"
|
||||||
# image: alpine
|
- unzip terraform.zip ; rm -f terraform.zip; chmod +x terraform
|
||||||
# environment:
|
- mkdir -p ${HOME}/bin ; export PATH=${PATH}:${HOME}/bin; mv terraform ${HOME}/bin/
|
||||||
# HCLOUD_TOKEN:
|
- terraform -v
|
||||||
# from_secret: serverctl_hcloud_token
|
- cd infrastructure/create-resources
|
||||||
# ACCESS_KEY:
|
- terraform init -backend-config="access_key=$ACCESS_KEY" -backend-config="secret_key=$SECRET_KEY"
|
||||||
# from_secret: serverctl_access_key
|
- terraform validate
|
||||||
# SECRET_KEY:
|
- terraform apply -auto-approve -var "hcloud_token=$HCLOUD_TOKEN" -var "pvt_key=../ssh_keys/id_ed25519" -var "pub_key=../ssh_keys/id_ed25519.pub" -var "hcloud_serverctl_ssh_key_id=$HCLOUD_SSH_KEY_ID"
|
||||||
# from_secret: serverctl_secret_key
|
- cd ansible
|
||||||
# SSH_ZIP_KEY:
|
- ANSIBLE_HOST_KEY_CHECKING=False /usr/bin/ansible-playbook -u root --key-file '../../ssh_keys/id_ed25519' -e 'pub_key=../../ssh_keys/id_ed25519.pub' site.yml
|
||||||
# from_secret: serverctl_ssh_zip_key
|
|
||||||
# HCLOUD_SSH_KEY_ID:
|
|
||||||
# from_secret: serverctl_hcloud_ssh_key_id
|
|
||||||
# commands:
|
|
||||||
# - apk --update add curl zip ansible python3
|
|
||||||
# - cd infrastructure && ./unzip-ssh-keys.sh "$SSH_ZIP_KEY" && cd ..
|
|
||||||
# - curl --silent --output terraform.zip "https://releases.hashicorp.com/terraform/1.1.6/terraform_1.1.6_linux_amd64.zip"
|
|
||||||
# - unzip terraform.zip ; rm -f terraform.zip; chmod +x terraform
|
|
||||||
# - mkdir -p ${HOME}/bin ; export PATH=${PATH}:${HOME}/bin; mv terraform ${HOME}/bin/
|
|
||||||
# - terraform -v
|
|
||||||
# - cd infrastructure/create-resources
|
|
||||||
# - terraform init -backend-config="access_key=$ACCESS_KEY" -backend-config="secret_key=$SECRET_KEY"
|
|
||||||
# - terraform validate
|
|
||||||
# - terraform apply -auto-approve -var "hcloud_token=$HCLOUD_TOKEN" -var "pvt_key=../ssh_keys/id_ed25519" -var "pub_key=../ssh_keys/id_ed25519.pub" -var "hcloud_serverctl_ssh_key_id=$HCLOUD_SSH_KEY_ID"
|
|
||||||
# - cd ansible
|
|
||||||
# - ANSIBLE_HOST_KEY_CHECKING=False /usr/bin/ansible-playbook -u root --key-file '../../ssh_keys/id_ed25519' -e 'pub_key=../../ssh_keys/id_ed25519.pub' site.yml
|
|
||||||
|
@ -52,10 +52,29 @@ services:
|
|||||||
logging: *loki-logging
|
logging: *loki-logging
|
||||||
depends_on:
|
depends_on:
|
||||||
- db_migrator
|
- db_migrator
|
||||||
|
- rabbitmq
|
||||||
|
|
||||||
|
# Messaging
|
||||||
|
rabbitmq:
|
||||||
|
image: docker.io/bitnami/rabbitmq:latest
|
||||||
|
ports:
|
||||||
|
- '4369:4369'
|
||||||
|
- '5551:5551'
|
||||||
|
- '5552:5552'
|
||||||
|
- '5672:5672'
|
||||||
|
- '25672:25672'
|
||||||
|
- '15672:15672'
|
||||||
|
networks:
|
||||||
|
- back-tier
|
||||||
|
environment:
|
||||||
|
- RABBITMQ_USERNAME=serverctl
|
||||||
|
- RABBITMQ_PASSWORD=serverctlsecret
|
||||||
|
volumes:
|
||||||
|
- 'rabbitmq_data:/bitnami/rabbitmq/mnesia'
|
||||||
|
|
||||||
# Logging
|
# Logging
|
||||||
loki:
|
loki:
|
||||||
image: grafana/loki:2.7.0
|
image: grafana/loki:2.4.2
|
||||||
ports:
|
ports:
|
||||||
- 3100
|
- 3100
|
||||||
networks:
|
networks:
|
||||||
@ -66,7 +85,7 @@ services:
|
|||||||
logging: *loki-logging
|
logging: *loki-logging
|
||||||
|
|
||||||
promtail:
|
promtail:
|
||||||
image: grafana/promtail:2.7.0
|
image: grafana/promtail:2.4.2
|
||||||
volumes:
|
volumes:
|
||||||
- ./services/logs/promtail/config.yaml:/mnt/config/promtail-config.yaml
|
- ./services/logs/promtail/config.yaml:/mnt/config/promtail-config.yaml
|
||||||
- /var/lib/docker/containers:/host/containers
|
- /var/lib/docker/containers:/host/containers
|
||||||
@ -114,3 +133,4 @@ volumes:
|
|||||||
db_data: {}
|
db_data: {}
|
||||||
prometheus_data: {}
|
prometheus_data: {}
|
||||||
grafana_data: {}
|
grafana_data: {}
|
||||||
|
rabbitmq_data: {}
|
||||||
|
@ -2,14 +2,6 @@
|
|||||||
k3s_version: v1.22.3+k3s1
|
k3s_version: v1.22.3+k3s1
|
||||||
ansible_user: root
|
ansible_user: root
|
||||||
systemd_dir: /etc/systemd/system
|
systemd_dir: /etc/systemd/system
|
||||||
systemd_network_dir: /etc/systemd/network
|
master_ip: "{{ hostvars[groups['serverctl_master_hosts'][0]]['ansible_host'] | default(groups['serverctl_master_hosts'][0]) }}"
|
||||||
master_ip: "{{ hostvars[groups['serverctl_master_hosts'][0]]['wireguard_ip'] | default(groups['serverctl_master_hosts'][0]) }}"
|
extra_server_args: ""
|
||||||
extra_server_args: "--flannel-iface=serverctl-wg0"
|
extra_agent_args: ""
|
||||||
extra_agent_args: "--flannel-iface=serverctl-wg0"
|
|
||||||
|
|
||||||
ansible_become_method: su
|
|
||||||
|
|
||||||
ufw_enabled: true
|
|
||||||
|
|
||||||
wireguard_mask_bits: 24
|
|
||||||
wireguard_port: 51871
|
|
||||||
|
@ -1,32 +0,0 @@
|
|||||||
[serverctl_master_hosts]
|
|
||||||
95.217.155.228 ansible_host=95.217.155.228 wireguard_ip=10.1.1.1
|
|
||||||
|
|
||||||
[serverctl_node_hosts]
|
|
||||||
65.21.50.146 ansible_host=65.21.50.146 wireguard_ip=10.1.1.10
|
|
||||||
95.216.162.16 ansible_host=95.216.162.16 wireguard_ip=10.1.1.11
|
|
||||||
|
|
||||||
[serverctl_home_servers]
|
|
||||||
192.168.1.150 ansible_host=192.168.1.150 wireguard_ip=10.1.1.8
|
|
||||||
#192.168.1.233 ansible_host=192.168.1.233 wireguard_ip=10.1.1.9
|
|
||||||
|
|
||||||
[serverctl_cluster:children]
|
|
||||||
serverctl_master_hosts
|
|
||||||
serverctl_node_hosts
|
|
||||||
|
|
||||||
[serverctl_super_cluster:children]
|
|
||||||
serverctl_cluster
|
|
||||||
serverctl_home_servers
|
|
||||||
|
|
||||||
[serverctl_home_servers:vars]
|
|
||||||
client_server=True
|
|
||||||
|
|
||||||
[serverctl_super_cluster:vars]
|
|
||||||
pipelining=true
|
|
||||||
ansible_ssh_user=root
|
|
||||||
ansible_ssh_port=22
|
|
||||||
|
|
||||||
[serverctl_cluster:vars]
|
|
||||||
client_server=False
|
|
||||||
pipelining=true
|
|
||||||
ansible_ssh_user=root
|
|
||||||
ansible_ssh_port=22
|
|
@ -1,8 +0,0 @@
|
|||||||
- hosts: serverctl_master_hosts[0]
|
|
||||||
become: yes
|
|
||||||
tasks:
|
|
||||||
- name: Fetch kubeconfig
|
|
||||||
ansible.builtin.fetch:
|
|
||||||
src: ~/.kube/config
|
|
||||||
dest: temp/.kube/config
|
|
||||||
|
|
@ -1,7 +0,0 @@
|
|||||||
---
|
|
||||||
- hosts: serverctl_super_cluster
|
|
||||||
gather_facts: yes
|
|
||||||
tasks:
|
|
||||||
- name: ping
|
|
||||||
command: "ping -c3 {{ hostvars[item].wireguard_ip}}"
|
|
||||||
with_items: "{{groups['all']}}"
|
|
@ -1,67 +0,0 @@
|
|||||||
---
|
|
||||||
- name: update packages
|
|
||||||
apt:
|
|
||||||
update_cache: yes
|
|
||||||
cache_valid_time: 3600
|
|
||||||
become: yes
|
|
||||||
|
|
||||||
- name: install ufw
|
|
||||||
apt:
|
|
||||||
name: ufw
|
|
||||||
state: present
|
|
||||||
become: yes
|
|
||||||
when: ufw_enabled
|
|
||||||
|
|
||||||
- name: Allow SSH in UFW
|
|
||||||
ufw:
|
|
||||||
rule: allow
|
|
||||||
port: "{{ ansible_ssh_port }}"
|
|
||||||
proto: tcp
|
|
||||||
become: yes
|
|
||||||
when: ufw_enabled
|
|
||||||
|
|
||||||
- name: Allow wireguard port in UFW
|
|
||||||
ufw:
|
|
||||||
rule: allow
|
|
||||||
port: "{{ wireguard_port }}"
|
|
||||||
proto: udp
|
|
||||||
become: yes
|
|
||||||
when: ufw_enabled
|
|
||||||
|
|
||||||
- name: Set ufw logging
|
|
||||||
ufw:
|
|
||||||
logging: "on"
|
|
||||||
become: yes
|
|
||||||
when: ufw_enabled
|
|
||||||
|
|
||||||
- name: inter-node Wireguard UFW connectivity
|
|
||||||
ufw:
|
|
||||||
rule: allow
|
|
||||||
src: "{{ hostvars[item].wireguard_ip }}"
|
|
||||||
with_items: "{{ groups['all'] }}"
|
|
||||||
become: yes
|
|
||||||
when: ufw_enabled and item != inventory_hostname
|
|
||||||
|
|
||||||
- name: Reject everything and enable UFW
|
|
||||||
ufw:
|
|
||||||
state: enabled
|
|
||||||
policy: reject
|
|
||||||
log: yes
|
|
||||||
become: yes
|
|
||||||
when: ufw_enabled
|
|
||||||
|
|
||||||
- name: Allow 6443 in UFW /tcp
|
|
||||||
ufw:
|
|
||||||
rule: allow
|
|
||||||
port: "6443"
|
|
||||||
proto: tcp
|
|
||||||
become: yes
|
|
||||||
when: ufw_enabled
|
|
||||||
|
|
||||||
- name: Allow 6443 in UFW udp
|
|
||||||
ufw:
|
|
||||||
rule: allow
|
|
||||||
port: "6443"
|
|
||||||
proto: udp
|
|
||||||
become: yes
|
|
||||||
when: ufw_enabled
|
|
@ -1,5 +1,4 @@
|
|||||||
---
|
---
|
||||||
|
|
||||||
- name: Copy K3s service file
|
- name: Copy K3s service file
|
||||||
register: k3s_service
|
register: k3s_service
|
||||||
template:
|
template:
|
||||||
|
@ -7,7 +7,7 @@ After=network-online.target
|
|||||||
Type=notify
|
Type=notify
|
||||||
ExecStartPre=-/sbin/modprobe br_netfilter
|
ExecStartPre=-/sbin/modprobe br_netfilter
|
||||||
ExecStartPre=-/sbin/modprobe overlay
|
ExecStartPre=-/sbin/modprobe overlay
|
||||||
ExecStart=/usr/local/bin/k3s server --data-dir {{ k3s_server_location }} {{ extra_server_args | default("") }} --advertise-address {{master_ip}}
|
ExecStart=/usr/local/bin/k3s server --data-dir {{ k3s_server_location }} {{ extra_server_args | default("") }}
|
||||||
KillMode=process
|
KillMode=process
|
||||||
Delegate=yes
|
Delegate=yes
|
||||||
# Having non-zero Limit*s causes performance problems due to accounting overhead
|
# Having non-zero Limit*s causes performance problems due to accounting overhead
|
||||||
|
@ -7,7 +7,7 @@ After=network-online.target
|
|||||||
Type=notify
|
Type=notify
|
||||||
ExecStartPre=-/sbin/modprobe br_netfilter
|
ExecStartPre=-/sbin/modprobe br_netfilter
|
||||||
ExecStartPre=-/sbin/modprobe overlay
|
ExecStartPre=-/sbin/modprobe overlay
|
||||||
ExecStart=/usr/local/bin/k3s agent --server https://{{ master_ip }}:6443 --token {{ hostvars[groups['serverctl_master_hosts'][0]]['token'] }} {{ extra_agent_args | default("") }} --node-ip {{inventory_hostname}}
|
ExecStart=/usr/local/bin/k3s agent --server https://{{ master_ip }}:6443 --token {{ hostvars[groups['serverctl_master_hosts'][0]]['token'] }} {{ extra_agent_args | default("") }}
|
||||||
KillMode=process
|
KillMode=process
|
||||||
Delegate=yes
|
Delegate=yes
|
||||||
# Having non-zero Limit*s causes performance problems due to accounting overhead
|
# Having non-zero Limit*s causes performance problems due to accounting overhead
|
||||||
|
@ -1,7 +0,0 @@
|
|||||||
---
|
|
||||||
- name: systemd network restart
|
|
||||||
service:
|
|
||||||
name: systemd-networkd
|
|
||||||
state: restarted
|
|
||||||
enabled: yes
|
|
||||||
become: yes
|
|
@ -1,89 +0,0 @@
|
|||||||
---
|
|
||||||
- name: install wireguard
|
|
||||||
apt:
|
|
||||||
name: wireguard
|
|
||||||
state: present
|
|
||||||
become: yes
|
|
||||||
when: ansible_distribution == 'Debian' or ansible_distribution == "Ubuntu"
|
|
||||||
|
|
||||||
- name: install wireguard
|
|
||||||
pacman:
|
|
||||||
name: wireguard-tools
|
|
||||||
state: present
|
|
||||||
become: yes
|
|
||||||
when: ansible_distribution == "Archlinux"
|
|
||||||
|
|
||||||
- name: generate wireguard keypair
|
|
||||||
shell: wg genkey | tee /etc/wireguard/serverctl-privatekey | wg pubkey | tee /etc/wireguard/serverctl-publickey
|
|
||||||
args:
|
|
||||||
creates: /etc/wireguard/serverctl-privatekey
|
|
||||||
become: yes
|
|
||||||
|
|
||||||
- name: register private key
|
|
||||||
shell: cat /etc/wireguard/serverctl-privatekey
|
|
||||||
register: wireguard_private_key
|
|
||||||
changed_when: false
|
|
||||||
become: yes
|
|
||||||
|
|
||||||
- name: register public key
|
|
||||||
shell: cat /etc/wireguard/serverctl-publickey
|
|
||||||
register: wireguard_public_key
|
|
||||||
changed_when: false
|
|
||||||
become: yes
|
|
||||||
|
|
||||||
- name: generate preshared keypair
|
|
||||||
shell: "wg genpsk > /etc/wireguard/serverctl-psk-{{item}}"
|
|
||||||
args:
|
|
||||||
creates: "/etc/wireguard/serverctl-psk-{{item}}"
|
|
||||||
when: inventory_hostname < item
|
|
||||||
with_items: "{{groups['serverctl_super_cluster']}}"
|
|
||||||
become: yes
|
|
||||||
|
|
||||||
- name: register preshared key
|
|
||||||
shell: "cat /etc/wireguard/serverctl-psk-{{item}}"
|
|
||||||
register: wireguard_preshared_key
|
|
||||||
changed_when: false
|
|
||||||
when: inventory_hostname < item
|
|
||||||
with_items: "{{groups['serverctl_super_cluster']}}"
|
|
||||||
become: yes
|
|
||||||
|
|
||||||
- name: message preshared keys
|
|
||||||
set_fact: "wireguard_preshared_keys={{wireguard_preshared_keys|default({}) | combine({item.item: item.stdout})}}"
|
|
||||||
when: item.skipped is not defined
|
|
||||||
with_items: "{{wireguard_preshared_key.results}}"
|
|
||||||
become: yes
|
|
||||||
|
|
||||||
#- name: print hostvars
|
|
||||||
# ansible.builtin.debug:
|
|
||||||
# msg: "{{hostvars[item]}}"
|
|
||||||
# with_items: "{{groups['serverctl_super_cluster']}}"
|
|
||||||
|
|
||||||
- name: Setup wg0 device
|
|
||||||
template:
|
|
||||||
src: 'systemd.netdev'
|
|
||||||
dest: '{{systemd_network_dir}}/99-serverctl-wg0.netdev'
|
|
||||||
owner: root
|
|
||||||
group: systemd-network
|
|
||||||
mode: 0640
|
|
||||||
become: yes
|
|
||||||
notify: systemd network restart
|
|
||||||
|
|
||||||
- name: Setup wg0 network
|
|
||||||
template:
|
|
||||||
src: 'systemd.network'
|
|
||||||
dest: "{{systemd_network_dir}}/99-serverctl-wg0.network"
|
|
||||||
owner: root
|
|
||||||
group: systemd-network
|
|
||||||
mode: 0640
|
|
||||||
become: yes
|
|
||||||
notify: systemd network restart
|
|
||||||
|
|
||||||
#- name: Start and enalbe wireguard on book
|
|
||||||
# systemd:
|
|
||||||
# name: wg-quick@wgserverctl0
|
|
||||||
# enabled: yes
|
|
||||||
# state: started
|
|
||||||
|
|
||||||
#- debug: msg="{{item.1}} - {{ (wireguard_base_ipv4|ipaddr(item.0 + 1)) }}"
|
|
||||||
# with_indexed_items: "{{groups.serverctl_mesh_nodes}}"
|
|
||||||
|
|
@ -1,22 +0,0 @@
|
|||||||
[NetDev]
|
|
||||||
Name=serverctl-wg0
|
|
||||||
Kind=wireguard
|
|
||||||
Description=WireGuard tunnel serverctl-wg0
|
|
||||||
|
|
||||||
[WireGuard]
|
|
||||||
ListenPort={{ wireguard_port }}
|
|
||||||
PrivateKey={{ wireguard_private_key.stdout }}
|
|
||||||
|
|
||||||
{% for peer in groups['serverctl_super_cluster'] %}
|
|
||||||
{% if peer != inventory_hostname %}
|
|
||||||
|
|
||||||
[WireGuardPeer]
|
|
||||||
PublicKey={{ hostvars[peer].wireguard_public_key.stdout }}
|
|
||||||
PresharedKey={{ wireguard_preshared_keys[peer] if inventory_hostname < peer else hostvars[peer].wireguard_preshared_keys[inventory_hostname] }}
|
|
||||||
AllowedIPs={{ hostvars[peer].wireguard_ip }}/32
|
|
||||||
{% if not hostvars[peer].client_server %}
|
|
||||||
Endpoint={{ hostvars[peer].ansible_host }}:{{ wireguard_port }}
|
|
||||||
PersistentKeepalive=25
|
|
||||||
{% endif %}
|
|
||||||
{% endif %}
|
|
||||||
{% endfor %}
|
|
@ -1,5 +0,0 @@
|
|||||||
[Match]
|
|
||||||
Name=serverctl-wg0
|
|
||||||
|
|
||||||
[Network]
|
|
||||||
Address={{ wireguard_ip }}/{{ wireguard_mask_bits }}
|
|
@ -5,21 +5,12 @@
|
|||||||
roles:
|
roles:
|
||||||
- role: prereq
|
- role: prereq
|
||||||
- role: download
|
- role: download
|
||||||
- role: firewall
|
|
||||||
|
|
||||||
- hosts: serverctl_super_cluster
|
|
||||||
gather_facts: yes
|
|
||||||
become: yes
|
|
||||||
roles:
|
|
||||||
- role: wireguard/mesh
|
|
||||||
|
|
||||||
- hosts: serverctl_master_hosts
|
- hosts: serverctl_master_hosts
|
||||||
become: yes
|
become: yes
|
||||||
roles:
|
roles:
|
||||||
- role: "./k3s/master"
|
- role: "./k3s/master"
|
||||||
|
#- hosts: serverctl_node_hosts
|
||||||
- hosts: serverctl_node_hosts
|
# become: yes
|
||||||
become: yes
|
# roles:
|
||||||
roles:
|
# - role: "./k3s/node"
|
||||||
- role: "./k3s/node"
|
#
|
||||||
|
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
|
||||||
variable "serverctl_master_count" {
|
variable "serverctl_master_count" {
|
||||||
default = 0
|
default = 0
|
||||||
}
|
}
|
||||||
@ -6,6 +7,7 @@ variable "serverctl_node_count" {
|
|||||||
default = 0
|
default = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
resource "hcloud_placement_group" "serverctl_master" {
|
resource "hcloud_placement_group" "serverctl_master" {
|
||||||
name = "serverctl_master_group"
|
name = "serverctl_master_group"
|
||||||
type = "spread"
|
type = "spread"
|
||||||
@ -16,7 +18,7 @@ resource "hcloud_server" "serverctl_master" {
|
|||||||
name = "serverctl-master-${count.index}"
|
name = "serverctl-master-${count.index}"
|
||||||
image = "debian-11"
|
image = "debian-11"
|
||||||
server_type = "cx11"
|
server_type = "cx11"
|
||||||
ssh_keys = [
|
ssh_keys = [
|
||||||
var.hcloud_serverctl_ssh_key_id
|
var.hcloud_serverctl_ssh_key_id
|
||||||
]
|
]
|
||||||
placement_group_id = hcloud_placement_group.serverctl_master.id
|
placement_group_id = hcloud_placement_group.serverctl_master.id
|
||||||
@ -48,7 +50,7 @@ resource "hcloud_server" "serverctl_node" {
|
|||||||
name = "serverctl-node-${count.index}"
|
name = "serverctl-node-${count.index}"
|
||||||
image = "debian-11"
|
image = "debian-11"
|
||||||
server_type = "cx11"
|
server_type = "cx11"
|
||||||
ssh_keys = [
|
ssh_keys = [
|
||||||
var.hcloud_serverctl_ssh_key_id
|
var.hcloud_serverctl_ssh_key_id
|
||||||
]
|
]
|
||||||
placement_group_id = hcloud_placement_group.serverctl_node.id
|
placement_group_id = hcloud_placement_group.serverctl_node.id
|
||||||
@ -71,10 +73,10 @@ resource "hcloud_server" "serverctl_node" {
|
|||||||
}
|
}
|
||||||
|
|
||||||
resource "local_file" "hosts_cfg" {
|
resource "local_file" "hosts_cfg" {
|
||||||
content = templatefile("${path.module}/templates/hosts.tftpl",
|
content = templatefile("${path.module}/templates/hosts.tpl",
|
||||||
{
|
{
|
||||||
serverctl_masters = hcloud_server.serverctl_master.*.ipv4_address
|
serverctl_masters = hcloud_server.serverctl_master.*.ipv4_address
|
||||||
serverctl_nodes = hcloud_server.serverctl_node.*.ipv4_address
|
serverctl_nodes = hcloud_server.serverctl_node.*.ipv4_address
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
filename = "ansible/inventory/hosts.cfg"
|
filename = "ansible/inventory/hosts.cfg"
|
||||||
|
@ -2,7 +2,7 @@ terraform {
|
|||||||
required_providers {
|
required_providers {
|
||||||
hcloud = {
|
hcloud = {
|
||||||
source = "hetznercloud/hcloud"
|
source = "hetznercloud/hcloud"
|
||||||
version = "1.49.1"
|
version = "1.32.2"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,35 +0,0 @@
|
|||||||
[serverctl_master_hosts]
|
|
||||||
%{ for ip in serverctl_masters ~}
|
|
||||||
${ip} ansible_host=${ip} wireguard_ip=${cidrhost("10.1.1.0/24", index(serverctl_masters, ip) + 1)}
|
|
||||||
%{ endfor ~}
|
|
||||||
|
|
||||||
[serverctl_node_hosts]
|
|
||||||
%{ for ip in serverctl_nodes ~}
|
|
||||||
${ip} ansible_host=${ip} wireguard_ip=${cidrhost("10.1.1.0/24", index(serverctl_nodes, ip) + 10)}
|
|
||||||
%{ endfor ~}
|
|
||||||
|
|
||||||
[serverctl_home_servers]
|
|
||||||
192.168.1.150 ansible_host=192.168.1.150 wireguard_ip=10.1.1.8
|
|
||||||
#192.168.1.233 ansible_host=192.168.1.233 wireguard_ip=10.1.1.9
|
|
||||||
|
|
||||||
[serverctl_cluster:children]
|
|
||||||
serverctl_master_hosts
|
|
||||||
serverctl_node_hosts
|
|
||||||
|
|
||||||
[serverctl_super_cluster:children]
|
|
||||||
serverctl_cluster
|
|
||||||
serverctl_home_servers
|
|
||||||
|
|
||||||
[serverctl_home_servers:vars]
|
|
||||||
client_server=True
|
|
||||||
|
|
||||||
[serverctl_super_cluster:vars]
|
|
||||||
pipelining=true
|
|
||||||
ansible_ssh_user=root
|
|
||||||
ansible_ssh_port=22
|
|
||||||
|
|
||||||
[serverctl_cluster:vars]
|
|
||||||
client_server=False
|
|
||||||
pipelining=true
|
|
||||||
ansible_ssh_user=root
|
|
||||||
ansible_ssh_port=22
|
|
13
infrastructure/create-resources/templates/hosts.tpl
Normal file
13
infrastructure/create-resources/templates/hosts.tpl
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
[serverctl_master_hosts]
|
||||||
|
%{ for ip in serverctl_masters ~}
|
||||||
|
${ip}
|
||||||
|
%{ endfor ~}
|
||||||
|
|
||||||
|
[serverctl_node_hosts]
|
||||||
|
%{ for ip in serverctl_nodes ~}
|
||||||
|
${ip}
|
||||||
|
%{ endfor ~}
|
||||||
|
|
||||||
|
[serverctl_cluster:children]
|
||||||
|
serverctl_master_hosts
|
||||||
|
serverctl_node_hosts
|
@ -1,3 +0,0 @@
|
|||||||
{
|
|
||||||
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
|
||||||
}
|
|
@ -1,4 +1,4 @@
|
|||||||
FROM golang:1.23-bullseye
|
FROM golang:1.17-bullseye
|
||||||
|
|
||||||
RUN go install github.com/jackc/tern@latest
|
RUN go install github.com/jackc/tern@latest
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
FROM golang:1.23-bullseye
|
FROM golang:1.17-bullseye
|
||||||
|
|
||||||
RUN go install github.com/cosmtrek/air@latest
|
RUN go install github.com/cosmtrek/air@latest
|
||||||
# Development don't need this
|
# Development don't need this
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"serverctl/pkg/api"
|
"serverctl/pkg/api"
|
||||||
"serverctl/pkg/infrastructure"
|
"serverctl/pkg/infrastructure"
|
||||||
"serverctl/pkg/infrastructure/dependencies"
|
"serverctl/pkg/infrastructure/dependencies"
|
||||||
|
"serverctl/pkg/infrastructure/queue"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Run main app, will bootstrap dependencies and run all external ports (http servers, queues, etc).
|
// Run main app, will bootstrap dependencies and run all external ports (http servers, queues, etc).
|
||||||
@ -11,6 +12,12 @@ func Run() {
|
|||||||
d := dependencies.New()
|
d := dependencies.New()
|
||||||
d.Logger.Info("Starting serverctl")
|
d.Logger.Info("Starting serverctl")
|
||||||
|
|
||||||
|
queue.NewRabbitMQ(d.Logger, &queue.RabbitMqConfig{
|
||||||
|
Username: "serverctl",
|
||||||
|
Password: "serverctlsecret",
|
||||||
|
Host: "rabbitmq",
|
||||||
|
Port: 5672,
|
||||||
|
})
|
||||||
// if development add seed data
|
// if development add seed data
|
||||||
infrastructure.AddSeedData(d.Database, d.Logger)
|
infrastructure.AddSeedData(d.Database, d.Logger)
|
||||||
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
module serverctl
|
module serverctl
|
||||||
|
|
||||||
go 1.19
|
go 1.17
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/Microsoft/go-winio v0.4.17 // indirect
|
github.com/Microsoft/go-winio v0.4.17 // indirect
|
||||||
@ -18,7 +18,6 @@ require (
|
|||||||
github.com/docker/go-units v0.4.0 // indirect
|
github.com/docker/go-units v0.4.0 // indirect
|
||||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
github.com/dustin/go-humanize v1.0.0 // indirect
|
||||||
github.com/eko/gocache v1.2.0 // indirect
|
github.com/eko/gocache v1.2.0 // indirect
|
||||||
github.com/georgysavva/scany v0.3.0 // indirect
|
|
||||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
github.com/gin-contrib/sse v0.1.0 // indirect
|
||||||
github.com/gin-gonic/gin v1.7.7 // indirect
|
github.com/gin-gonic/gin v1.7.7 // indirect
|
||||||
github.com/go-co-op/gocron v1.12.0 // indirect
|
github.com/go-co-op/gocron v1.12.0 // indirect
|
||||||
@ -37,7 +36,6 @@ require (
|
|||||||
github.com/jackc/pgproto3/v2 v2.2.0 // indirect
|
github.com/jackc/pgproto3/v2 v2.2.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
||||||
github.com/jackc/pgtype v1.10.0 // indirect
|
github.com/jackc/pgtype v1.10.0 // indirect
|
||||||
github.com/jackc/pgx v3.6.2+incompatible // indirect
|
|
||||||
github.com/jackc/pgx/v4 v4.15.0 // indirect
|
github.com/jackc/pgx/v4 v4.15.0 // indirect
|
||||||
github.com/jackc/puddle v1.2.1 // indirect
|
github.com/jackc/puddle v1.2.1 // indirect
|
||||||
github.com/json-iterator/go v1.1.10 // indirect
|
github.com/json-iterator/go v1.1.10 // indirect
|
||||||
@ -54,6 +52,7 @@ require (
|
|||||||
github.com/prometheus/client_model v0.2.0 // indirect
|
github.com/prometheus/client_model v0.2.0 // indirect
|
||||||
github.com/prometheus/common v0.18.0 // indirect
|
github.com/prometheus/common v0.18.0 // indirect
|
||||||
github.com/prometheus/procfs v0.6.0 // indirect
|
github.com/prometheus/procfs v0.6.0 // indirect
|
||||||
|
github.com/rabbitmq/amqp091-go v1.3.0 // indirect
|
||||||
github.com/robfig/cron/v3 v3.0.1 // indirect
|
github.com/robfig/cron/v3 v3.0.1 // indirect
|
||||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||||
github.com/spf13/cast v1.3.1 // indirect
|
github.com/spf13/cast v1.3.1 // indirect
|
||||||
|
@ -793,6 +793,8 @@ github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
|
|||||||
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
|
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
|
||||||
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
||||||
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.3.0 h1:A/QuHiNw7LMCJsxx9iZn5lrIz6OrhIn7Dfk5/1YatWM=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.3.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
|
||||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||||
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
||||||
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
||||||
@ -846,6 +848,8 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y
|
|||||||
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
|
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
|
||||||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||||
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||||
|
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
|
||||||
|
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||||
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
|
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
|
||||||
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
4
services/entry/pkg/infrastructure/queue/queue.go
Normal file
4
services/entry/pkg/infrastructure/queue/queue.go
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
package queue
|
||||||
|
|
||||||
|
type Queue interface {
|
||||||
|
}
|
217
services/entry/pkg/infrastructure/queue/rabbitmq.go
Normal file
217
services/entry/pkg/infrastructure/queue/rabbitmq.go
Normal file
@ -0,0 +1,217 @@
|
|||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
import amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
|
||||||
|
type RabbitMQ struct {
|
||||||
|
logger *zap.Logger
|
||||||
|
config *RabbitMqConfig
|
||||||
|
conn *amqp.Connection
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Queue = RabbitMQ{}
|
||||||
|
|
||||||
|
type RabbitMqConfig struct {
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
Host string
|
||||||
|
Port int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRabbitMQ(logger *zap.Logger, config *RabbitMqConfig) Queue {
|
||||||
|
|
||||||
|
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", config.Username, config.Password, config.Host, config.Port))
|
||||||
|
if err != nil {
|
||||||
|
logger.Panic("Could not dial rabbitmq", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port), zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
//sendBasic(logger, config, conn)
|
||||||
|
//receiveBasic(logger, config, conn)
|
||||||
|
sendPublishingBasic(logger, config, conn)
|
||||||
|
for i := 0; i < 200; i++ {
|
||||||
|
time.Sleep(time.Millisecond * 200)
|
||||||
|
receivePublishingBasic(logger, config, conn)
|
||||||
|
}
|
||||||
|
//sendMany(logger, config, conn)
|
||||||
|
|
||||||
|
return &RabbitMQ{
|
||||||
|
logger: logger,
|
||||||
|
config: config,
|
||||||
|
conn: conn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
body := "Hello world!"
|
||||||
|
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.Info("Sent message", zap.String("body", body))
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer ch.Close()
|
||||||
|
counter := 0
|
||||||
|
for {
|
||||||
|
body := fmt.Sprintf("message nr: %d", counter)
|
||||||
|
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.Info("Sent message", zap.String("body", body))
|
||||||
|
counter += 1
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
func receiveBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := ch.Consume(q.Name,
|
||||||
|
"",
|
||||||
|
true,
|
||||||
|
false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer ch.Close()
|
||||||
|
|
||||||
|
for d := range msgs {
|
||||||
|
logger.Info("Received msg", zap.String("body", string(d.Body)))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
func sendMany(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
body := "Hello world!"
|
||||||
|
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.Info("Sent message", zap.String("body", body))
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer ch.Close()
|
||||||
|
counter := 0
|
||||||
|
for {
|
||||||
|
body := fmt.Sprintf("message nr: %d", counter)
|
||||||
|
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.Info("Sent message", zap.String("body", body))
|
||||||
|
counter += 1
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
func sendPublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer ch.Close()
|
||||||
|
counter := 0
|
||||||
|
for {
|
||||||
|
body := fmt.Sprintf("message nr: %d", counter)
|
||||||
|
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
|
||||||
|
DeliveryMode: amqp.Persistent,
|
||||||
|
ContentType: "text/plain", Body: []byte(body)})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.Info("Sent message", zap.String("body", body))
|
||||||
|
counter += 1
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 200)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
func receivePublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ch.Qos(1, 0, false)
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := ch.Consume(q.Name,
|
||||||
|
"",
|
||||||
|
false,
|
||||||
|
false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer ch.Close()
|
||||||
|
|
||||||
|
for d := range msgs {
|
||||||
|
logger.Info("Received msg", zap.String("body", string(d.Body)), zap.Int("secondsToSleep", len(d.Body)))
|
||||||
|
t := time.Duration(len(d.Body))
|
||||||
|
time.Sleep(t * time.Second)
|
||||||
|
logger.Info("Received msg: Done")
|
||||||
|
d.Ack(false)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user