Program Listing for File CBorStream.h
↰ Return to documentation for file (src/desert_classes/CBorStream.h)
/****************************************************************************
* Copyright (C) 2024 Davide Costa *
* *
* This file is part of RMW desert. *
* *
* RMW desert is free software: you can redistribute it and/or modify it *
* under the terms of the GNU General Public License as published by the *
* Free Software Foundation, either version 3 of the License, or any *
* later version. *
* *
* RMW desert is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with RMW desert. If not, see <http://www.gnu.org/licenses/>. *
****************************************************************************/
#ifndef CBORSTREAM_H_
#define CBORSTREAM_H_
#include "TcpDaemon.h"
#include "TopicsConfig.h"
#include <map>
#include <queue>
#include <utility>
#include <vector>
#include <string>
#include <locale>
#include <codecvt>
#include <cstdint>
#include <cstdio>
#include <mutex>
#include "cbor/encoder.h"
#include "cbor/ieee754.h"
#include "cbor/decoder.h"
#include "cbor/parser.h"
#include "cbor/helper.h"
#include "half.hpp"
#define PUBLISHER_TYPE 0
#define SUBSCRIBER_TYPE 1
#define CLIENT_TYPE 2
#define SERVICE_TYPE 3
#define MAX_BUFFER_CAPACITY 100
template <typename T, int MaxLen, typename Container=std::deque<T>>
class CircularQueue : public std::queue<T, Container> {
public:
void push(const T& value)
{
if (this->size() == MaxLen)
{
this->c.pop_front();
}
std::queue<T, Container>::push(value);
}
};
namespace cbor
{
class TxStream
{
public:
TxStream(uint8_t stream_type, std::string stream_name, uint8_t stream_identifier);
void start_transmission(uint64_t sequence_id);
void start_transmission();
void end_transmission();
TxStream & operator<<(const uint64_t n);
TxStream & operator<<(const uint32_t n);
TxStream & operator<<(const uint16_t n);
TxStream & operator<<(const uint8_t n);
TxStream & operator<<(const int64_t n);
TxStream & operator<<(const int32_t n);
TxStream & operator<<(const int16_t n);
TxStream & operator<<(const int8_t n);
TxStream & operator<<(const char n);
TxStream & operator<<(const float f);
TxStream & operator<<(const double d);
TxStream & operator<<(const std::string s);
TxStream & operator<<(const std::u16string s);
TxStream & operator<<(const bool b);
template<typename T>
inline TxStream & operator<<(const std::vector<T> v)
{
*this << static_cast<const uint32_t>(v.size());
return serialize_sequence(v.data(), v.size());
}
TxStream & operator<<(const std::vector<bool> v);
template<typename T>
inline TxStream & serialize_sequence(const T * items, size_t size)
{
for (size_t i = 0; i < size; ++i)
{
*this << items[i];
}
return *this;
}
private:
uint8_t _stream_type;
std::string _stream_name;
uint8_t _stream_identifier;
bool _overflow;
uint8_t * _packet;
cbor_writer_t * _writer;
void new_packet();
void handle_overrun(cbor_error_t result);
std::string toUTF8(const std::u16string source);
};
class RxStream
{
public:
RxStream(uint8_t stream_type, std::string stream_name, uint8_t stream_identifier);
bool data_available(int64_t sequence_id = 0);
void clear_buffer();
RxStream & operator>>(uint64_t & n);
RxStream & operator>>(uint32_t & n);
RxStream & operator>>(uint16_t & n);
RxStream & operator>>(uint8_t & n);
RxStream & operator>>(int64_t & n);
RxStream & operator>>(int32_t & n);
RxStream & operator>>(int16_t & n);
RxStream & operator>>(int8_t & n);
template<typename T>
RxStream & deserialize_integer(T & n);
RxStream & operator>>(char & n);
RxStream & operator>>(float & f);
RxStream & operator>>(double & d);
RxStream & operator>>(std::string & s);
RxStream & operator>>(std::u16string & s);
RxStream & operator>>(bool & b);
template<typename T>
inline RxStream & operator>>(std::vector<T> & v)
{
uint32_t size;
*this >> size;
v.resize(size);
return deserialize_sequence(v.data(), size);
}
RxStream & operator>>(std::vector<bool> & v);
template<typename T>
inline RxStream & deserialize_sequence(T * items, size_t size)
{
for (size_t i = 0; i < size; ++i)
{
*this >> items[i];
}
return *this;
}
static void interpret_packets();
private:
uint8_t _stream_type;
std::string _stream_name;
uint8_t _stream_identifier;
int _buffered_iterator;
std::vector<std::pair<void *, int>> _buffered_packet;
// <topic, packets <packet <field, field_type>>>
static std::map<uint32_t, CircularQueue<std::vector<std::pair<void *, int>>, MAX_BUFFER_CAPACITY>> _interpreted_publications;
// <service, packets <packet <field, field_type>>>
static std::map<uint32_t, CircularQueue<std::vector<std::pair<void *, int>>, MAX_BUFFER_CAPACITY>> _interpreted_requests;
// <service + id, packets <packet <field, field_type>>>
static std::map<uint32_t, CircularQueue<std::vector<std::pair<void *, int>>, MAX_BUFFER_CAPACITY>> _interpreted_responses;
union _cbor_value {
int8_t i8;
int16_t i16;
int32_t i32;
int64_t i64;
float f32;
double f64;
uint8_t *bin;
char *str;
uint8_t str_copy[128];
};
static std::mutex _rx_mutex;
static std::pair<void *, int> interpret_field(cbor_item_t * items, size_t i, union _cbor_value & val);
std::u16string toUTF16(const std::string source);
};
} // namespace cbor
#endif