#include "thread.h"
#include <proton/engine.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define SSL_FILE(NAME) "ssl-certs/" NAME
#define SSL_PW "tserverpw"
#if defined(_WIN32)
# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.p12")
# define SET_CREDENTIALS(DOMAIN, NAME) \
pn_ssl_domain_set_credentials(DOMAIN, SSL_FILE(NAME "-full.p12"), "", SSL_PW)
#else
# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.pem")
# define SET_CREDENTIALS(DOMAIN, NAME) \
pn_ssl_domain_set_credentials(DOMAIN, CERTIFICATE(NAME), SSL_FILE(NAME "-private-key.pem"), SSL_PW)
#endif
#define VEC(T) struct { T* data; size_t len, cap; }
#define VEC_INIT(V) \
do { \
void **vp = (void**)&V.data; \
V.len = 0; \
V.cap = 16; \
*vp = malloc(V.cap * sizeof(*V.data)); \
} while(0)
#define VEC_FINAL(V) free(V.data)
#define VEC_PUSH(V, X) \
do { \
if (V.len == V.cap) { \
void **vp = (void**)&V.data; \
V.cap *= 2; \
*vp = realloc(V.data, V.cap * sizeof(*V.data)); \
} \
V.data[V.len++] = X; \
} while(0) \
#define VEC_POP(V) \
do { \
if (V.len > 0) \
memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data)); \
} while(0)
typedef struct queue_t {
pthread_mutex_t lock;
char *name;
struct queue_t *next;
size_t sent;
} queue_t;
static void queue_init(queue_t *q, const char* name, queue_t *next) {
pthread_mutex_init(&q->lock, NULL);
q->name = (char*)malloc(strlen(name)+1);
memcpy(q->name, name, strlen(name)+1);
VEC_INIT(q->messages);
VEC_INIT(q->waiting);
q->next = next;
q->sent = 0;
}
static void queue_destroy(queue_t *q) {
size_t i;
pthread_mutex_destroy(&q->lock);
for (i = 0; i < q->messages.len; ++i)
free(q->messages.data[i].start);
VEC_FINAL(q->messages);
for (i = 0; i < q->waiting.len; ++i)
pn_decref(q->waiting.data[i]);
VEC_FINAL(q->waiting);
free(q->name);
}
static void queue_send(queue_t *q,
pn_link_t *s) {
size_t tag = 0;
pthread_mutex_lock(&q->lock);
if (q->messages.len == 0) {
size_t i = 0;
for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
;
if (i == q->waiting.len) {
VEC_PUSH(q->waiting, c);
}
} else {
m = q->messages.data[0];
VEC_POP(q->messages);
tag = ++q->sent;
}
pthread_mutex_unlock(&q->lock);
free(m.start);
}
}
}
}
}
}
pthread_mutex_lock(&q->lock);
VEC_PUSH(q->messages, m);
if (q->messages.len == 1) {
size_t i;
for (i = 0; i < q->waiting.len; ++i) {
set_check_queues(c, true);
}
q->waiting.len = 0;
}
pthread_mutex_unlock(&q->lock);
}
typedef struct queues_t {
pthread_mutex_t lock;
queue_t *queues;
size_t sent;
} queues_t;
void queues_init(queues_t *qs) {
pthread_mutex_init(&qs->lock, NULL);
qs->queues = NULL;
}
void queues_destroy(queues_t *qs) {
while (qs->queues) {
queue_t *q = qs->queues;
qs->queues = qs->queues->next;
queue_destroy(q);
free(q);
}
pthread_mutex_destroy(&qs->lock);
}
queue_t* queues_get(queues_t *qs, const char* name) {
queue_t *q;
pthread_mutex_lock(&qs->lock);
for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
;
if (!q) {
q = (queue_t*)malloc(sizeof(queue_t));
queue_init(q, name, qs->queues);
qs->queues = q;
}
pthread_mutex_unlock(&qs->lock);
return q;
}
typedef struct broker_t {
size_t threads;
const char *container_id;
queues_t queues;
bool finished;
} broker_t;
void broker_stop(broker_t *b) {
}
static void link_send(broker_t *b,
pn_link_t *s) {
queue_t *q = queues_get(&b->queues, qname);
queue_send(q, s);
}
}
size_t i;
pthread_mutex_lock(&q->lock);
for (i = 0; i < q->waiting.len; ++i) {
if (q->waiting.data[i] == c){
q->waiting.data[i] = q->waiting.data[0];
VEC_POP(q->waiting);
break;
}
}
pthread_mutex_unlock(&q->lock);
}
static void link_unsub(broker_t *b,
pn_link_t *s) {
if (qname) {
queue_t *q = queues_get(&b->queues, qname);
}
}
}
link_unsub(b, l);
}
link_unsub(b, l);
}
}
}
}
const int WINDOW=5;
printf("listening on %s\n", port);
fflush(stdout);
break;
}
if (b->ssl_domain) {
}
break;
}
break;
break;
}
if (get_check_queues(c)) {
set_check_queues(c, false);
link_send(b, l);
}
break;
}
break;
}
} else {
}
break;
}
break;
}
if (buf) {
free(buf->start);
free(buf);
}
break;
}
ssize_t recv;
m->size += size;
m->start = (char*)realloc(m->start, m->size);
fprintf(stderr, "Message aborted\n");
fflush(stderr);
m->size = 0;
}
else if (recv < 0 && recv !=
PN_EOS) {
queue_receive(b->proactor, queues_get(&b->queues, qname), *m);
*m = pn_rwbytes_null;
}
}
break;
}
break;
break;
break;
break;
broker_stop(b);
break;
broker_stop(b);
break;
b->finished = true;
break;
default:
break;
}
}
static void* broker_thread(void *void_broker) {
broker_t *b = (broker_t*)void_broker;
do {
handle(b, e);
}
} while(!b->finished);
return NULL;
}
int main(int argc, char **argv) {
const char *host = (argc > 1) ? argv[1] : "";
const char *port = (argc > 2) ? argv[2] : "amqp";
broker_t b = {0};
queues_init(&b.queues);
b.container_id = argv[0];
b.threads = 4;
SET_CREDENTIALS(b.ssl_domain, "tserver");
{
}
{
pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
size_t i;
for (i = 0; i < b.threads-1; ++i) {
pthread_create(&threads[i], NULL, broker_thread, &b);
}
broker_thread(&b);
for (i = 0; i < b.threads-1; ++i) {
pthread_join(threads[i], NULL);
}
free(threads);
return 0;
}
}