whatcanGOwrong

This commit is contained in:
2024-09-19 21:38:24 -04:00
commit d0ae4d841d
17908 changed files with 4096831 additions and 0 deletions
@@ -0,0 +1,26 @@
# ClickHouse
`clickhouse://host:port?username=user&password=password&database=clicks&x-multi-statement=true`
| URL Query | Description |
|------------|-------------|
| `x-migrations-table`| Name of the migrations table |
| `x-migrations-table-engine`| Engine to use for the migrations table, defaults to TinyLog |
| `x-cluster-name` | Name of cluster for creating `schema_migrations` table cluster wide |
| `database` | The name of the database to connect to |
| `username` | The user to sign in as |
| `password` | The user's password |
| `host` | The host to connect to. |
| `port` | The port to bind to. |
| `x-multi-statement` | false | Enable multiple statements to be ran in a single migration (See note below) |
## Notes
* The Clickhouse driver does not natively support executing multiple statements in a single query. To allow for multiple statements in a single migration, you can use the `x-multi-statement` param. There are two important caveats:
* This mode splits the migration text into separately-executed statements by a semi-colon `;`. Thus `x-multi-statement` cannot be used when a statement in the migration contains a string with a semi-colon.
* The queries are not executed in any sort of transaction/batch, meaning you are responsible for fixing partial migrations.
* Using the default TinyLog table engine for the schema_versions table prevents backing up the table if using the [clickhouse-backup](https://github.com/AlexAkulov/clickhouse-backup) tool. If backing up the database with make sure the migrations are run with `x-migrations-table-engine=MergeTree`.
* Clickhouse cluster mode is not officially supported, since it's not tested right now, but you can try enabling `schema_migrations` table replication by specifying a `x-cluster-name`:
* When `x-cluster-name` is specified, `x-migrations-table-engine` also should be specified. See the docs regarding [replicated table engines](https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/replication/#table_engines-replication).
* When `x-cluster-name` is specified, only the `schema_migrations` table is replicated across the cluster. You still need to write your migrations so that the application tables are replicated within the cluster.
* If you want to create database inside the migration, you should know, that table which will manage migrations `schema-migrations table` will be in `default` table, so you can't use `USE <database_name>` inside migration. In this case you may not specify the database in the connection string (example you can find [here](examples/migrations/003_create_database.up.sql))
@@ -0,0 +1,316 @@
package clickhouse
import (
"database/sql"
"fmt"
"io"
"net/url"
"strconv"
"strings"
"time"
"go.uber.org/atomic"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/multistmt"
"github.com/hashicorp/go-multierror"
)
var (
multiStmtDelimiter = []byte(";")
DefaultMigrationsTable = "schema_migrations"
DefaultMigrationsTableEngine = "TinyLog"
DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB
ErrNilConfig = fmt.Errorf("no config")
)
type Config struct {
DatabaseName string
ClusterName string
MigrationsTable string
MigrationsTableEngine string
MultiStatementEnabled bool
MultiStatementMaxSize int
}
func init() {
database.Register("clickhouse", &ClickHouse{})
}
func WithInstance(conn *sql.DB, config *Config) (database.Driver, error) {
if config == nil {
return nil, ErrNilConfig
}
if err := conn.Ping(); err != nil {
return nil, err
}
ch := &ClickHouse{
conn: conn,
config: config,
}
if err := ch.init(); err != nil {
return nil, err
}
return ch, nil
}
type ClickHouse struct {
conn *sql.DB
config *Config
isLocked atomic.Bool
}
func (ch *ClickHouse) Open(dsn string) (database.Driver, error) {
purl, err := url.Parse(dsn)
if err != nil {
return nil, err
}
q := migrate.FilterCustomQuery(purl)
q.Scheme = "tcp"
conn, err := sql.Open("clickhouse", q.String())
if err != nil {
return nil, err
}
multiStatementMaxSize := DefaultMultiStatementMaxSize
if s := purl.Query().Get("x-multi-statement-max-size"); len(s) > 0 {
multiStatementMaxSize, err = strconv.Atoi(s)
if err != nil {
return nil, err
}
}
migrationsTableEngine := DefaultMigrationsTableEngine
if s := purl.Query().Get("x-migrations-table-engine"); len(s) > 0 {
migrationsTableEngine = s
}
ch = &ClickHouse{
conn: conn,
config: &Config{
MigrationsTable: purl.Query().Get("x-migrations-table"),
MigrationsTableEngine: migrationsTableEngine,
DatabaseName: purl.Query().Get("database"),
ClusterName: purl.Query().Get("x-cluster-name"),
MultiStatementEnabled: purl.Query().Get("x-multi-statement") == "true",
MultiStatementMaxSize: multiStatementMaxSize,
},
}
if err := ch.init(); err != nil {
return nil, err
}
return ch, nil
}
func (ch *ClickHouse) init() error {
if len(ch.config.DatabaseName) == 0 {
if err := ch.conn.QueryRow("SELECT currentDatabase()").Scan(&ch.config.DatabaseName); err != nil {
return err
}
}
if len(ch.config.MigrationsTable) == 0 {
ch.config.MigrationsTable = DefaultMigrationsTable
}
if ch.config.MultiStatementMaxSize <= 0 {
ch.config.MultiStatementMaxSize = DefaultMultiStatementMaxSize
}
if len(ch.config.MigrationsTableEngine) == 0 {
ch.config.MigrationsTableEngine = DefaultMigrationsTableEngine
}
return ch.ensureVersionTable()
}
func (ch *ClickHouse) Run(r io.Reader) error {
if ch.config.MultiStatementEnabled {
var err error
if e := multistmt.Parse(r, multiStmtDelimiter, ch.config.MultiStatementMaxSize, func(m []byte) bool {
tq := strings.TrimSpace(string(m))
if tq == "" {
return true
}
if _, e := ch.conn.Exec(string(m)); e != nil {
err = database.Error{OrigErr: e, Err: "migration failed", Query: m}
return false
}
return true
}); e != nil {
return e
}
return err
}
migration, err := io.ReadAll(r)
if err != nil {
return err
}
if _, err := ch.conn.Exec(string(migration)); err != nil {
return database.Error{OrigErr: err, Err: "migration failed", Query: migration}
}
return nil
}
func (ch *ClickHouse) Version() (int, bool, error) {
var (
version int
dirty uint8
query = "SELECT version, dirty FROM `" + ch.config.MigrationsTable + "` ORDER BY sequence DESC LIMIT 1"
)
if err := ch.conn.QueryRow(query).Scan(&version, &dirty); err != nil {
if err == sql.ErrNoRows {
return database.NilVersion, false, nil
}
return 0, false, &database.Error{OrigErr: err, Query: []byte(query)}
}
return version, dirty == 1, nil
}
func (ch *ClickHouse) SetVersion(version int, dirty bool) error {
var (
bool = func(v bool) uint8 {
if v {
return 1
}
return 0
}
tx, err = ch.conn.Begin()
)
if err != nil {
return err
}
query := "INSERT INTO " + ch.config.MigrationsTable + " (version, dirty, sequence) VALUES (?, ?, ?)"
if _, err := tx.Exec(query, version, bool(dirty), time.Now().UnixNano()); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
return tx.Commit()
}
// ensureVersionTable checks if versions table exists and, if not, creates it.
// Note that this function locks the database, which deviates from the usual
// convention of "caller locks" in the ClickHouse type.
func (ch *ClickHouse) ensureVersionTable() (err error) {
if err = ch.Lock(); err != nil {
return err
}
defer func() {
if e := ch.Unlock(); e != nil {
if err == nil {
err = e
} else {
err = multierror.Append(err, e)
}
}
}()
var (
table string
query = "SHOW TABLES FROM " + quoteIdentifier(ch.config.DatabaseName) + " LIKE '" + ch.config.MigrationsTable + "'"
)
// check if migration table exists
if err := ch.conn.QueryRow(query).Scan(&table); err != nil {
if err != sql.ErrNoRows {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
} else {
return nil
}
// if not, create the empty migration table
if len(ch.config.ClusterName) > 0 {
query = fmt.Sprintf(`
CREATE TABLE %s ON CLUSTER %s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, ch.config.MigrationsTable, ch.config.ClusterName, ch.config.MigrationsTableEngine)
} else {
query = fmt.Sprintf(`
CREATE TABLE %s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, ch.config.MigrationsTable, ch.config.MigrationsTableEngine)
}
if strings.HasSuffix(ch.config.MigrationsTableEngine, "Tree") {
query = fmt.Sprintf(`%s ORDER BY sequence`, query)
}
if _, err := ch.conn.Exec(query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
return nil
}
func (ch *ClickHouse) Drop() (err error) {
query := "SHOW TABLES FROM " + quoteIdentifier(ch.config.DatabaseName)
tables, err := ch.conn.Query(query)
if err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
defer func() {
if errClose := tables.Close(); errClose != nil {
err = multierror.Append(err, errClose)
}
}()
for tables.Next() {
var table string
if err := tables.Scan(&table); err != nil {
return err
}
query = "DROP TABLE IF EXISTS " + quoteIdentifier(ch.config.DatabaseName) + "." + quoteIdentifier(table)
if _, err := ch.conn.Exec(query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
}
if err := tables.Err(); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
return nil
}
func (ch *ClickHouse) Lock() error {
if !ch.isLocked.CAS(false, true) {
return database.ErrLocked
}
return nil
}
func (ch *ClickHouse) Unlock() error {
if !ch.isLocked.CAS(true, false) {
return database.ErrNotLocked
}
return nil
}
func (ch *ClickHouse) Close() error { return ch.conn.Close() }
// Copied from lib/pq implementation: https://github.com/lib/pq/blob/v1.9.0/conn.go#L1611
func quoteIdentifier(name string) string {
end := strings.IndexRune(name, 0)
if end > -1 {
name = name[:end]
}
return `"` + strings.Replace(name, `"`, `""`, -1) + `"`
}
@@ -0,0 +1,224 @@
package clickhouse_test
import (
"context"
"database/sql"
sqldriver "database/sql/driver"
"fmt"
"log"
"testing"
_ "github.com/ClickHouse/clickhouse-go"
"github.com/dhui/dktest"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/clickhouse"
dt "github.com/golang-migrate/migrate/v4/database/testing"
"github.com/golang-migrate/migrate/v4/dktesting"
_ "github.com/golang-migrate/migrate/v4/source/file"
)
const defaultPort = 9000
var (
tableEngines = []string{"TinyLog", "MergeTree"}
opts = dktest.Options{
Env: map[string]string{"CLICKHOUSE_USER": "user", "CLICKHOUSE_PASSWORD": "password", "CLICKHOUSE_DB": "db"},
PortRequired: true, ReadyFunc: isReady,
}
specs = []dktesting.ContainerSpec{
{ImageName: "yandex/clickhouse-server:21.3", Options: opts},
}
)
func clickhouseConnectionString(host, port, engine string) string {
if engine != "" {
return fmt.Sprintf(
"clickhouse://%v:%v?username=user&password=password&database=db&x-multi-statement=true&x-migrations-table-engine=%v&debug=false",
host, port, engine)
}
return fmt.Sprintf(
"clickhouse://%v:%v?username=user&password=password&database=db&x-multi-statement=true&debug=false",
host, port)
}
func isReady(ctx context.Context, c dktest.ContainerInfo) bool {
ip, port, err := c.Port(defaultPort)
if err != nil {
return false
}
db, err := sql.Open("clickhouse", clickhouseConnectionString(ip, port, ""))
if err != nil {
log.Println("open error", err)
return false
}
defer func() {
if err := db.Close(); err != nil {
log.Println("close error:", err)
}
}()
if err = db.PingContext(ctx); err != nil {
switch err {
case sqldriver.ErrBadConn:
return false
default:
fmt.Println(err)
}
return false
}
return true
}
func TestCases(t *testing.T) {
for _, engine := range tableEngines {
t.Run("Test_"+engine, func(t *testing.T) { testSimple(t, engine) })
t.Run("Migrate_"+engine, func(t *testing.T) { testMigrate(t, engine) })
t.Run("Version_"+engine, func(t *testing.T) { testVersion(t, engine) })
t.Run("Drop_"+engine, func(t *testing.T) { testDrop(t, engine) })
}
t.Run("WithInstanceDefaultConfigValues", func(t *testing.T) { testSimpleWithInstanceDefaultConfigValues(t) })
}
func testSimple(t *testing.T, engine string) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ip, port, err := c.Port(defaultPort)
if err != nil {
t.Fatal(err)
}
addr := clickhouseConnectionString(ip, port, engine)
p := &clickhouse.ClickHouse{}
d, err := p.Open(addr)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Error(err)
}
}()
dt.Test(t, d, []byte("SELECT 1"))
})
}
func testSimpleWithInstanceDefaultConfigValues(t *testing.T) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ip, port, err := c.Port(defaultPort)
if err != nil {
t.Fatal(err)
}
addr := clickhouseConnectionString(ip, port, "")
conn, err := sql.Open("clickhouse", addr)
if err != nil {
t.Fatal(err)
}
d, err := clickhouse.WithInstance(conn, &clickhouse.Config{})
if err != nil {
_ = conn.Close()
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Error(err)
}
}()
dt.Test(t, d, []byte("SELECT 1"))
})
}
func testMigrate(t *testing.T, engine string) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ip, port, err := c.Port(defaultPort)
if err != nil {
t.Fatal(err)
}
addr := clickhouseConnectionString(ip, port, engine)
p := &clickhouse.ClickHouse{}
d, err := p.Open(addr)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Error(err)
}
}()
m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "db", d)
if err != nil {
t.Fatal(err)
}
dt.TestMigrate(t, m)
})
}
func testVersion(t *testing.T, engine string) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
expectedVersion := 1
ip, port, err := c.Port(defaultPort)
if err != nil {
t.Fatal(err)
}
addr := clickhouseConnectionString(ip, port, engine)
p := &clickhouse.ClickHouse{}
d, err := p.Open(addr)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Error(err)
}
}()
err = d.SetVersion(expectedVersion, false)
if err != nil {
t.Fatal(err)
}
version, _, err := d.Version()
if err != nil {
t.Fatal(err)
}
if version != expectedVersion {
t.Fatal("Version mismatch")
}
})
}
func testDrop(t *testing.T, engine string) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ip, port, err := c.Port(defaultPort)
if err != nil {
t.Fatal(err)
}
addr := clickhouseConnectionString(ip, port, engine)
p := &clickhouse.ClickHouse{}
d, err := p.Open(addr)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Error(err)
}
}()
err = d.Drop()
if err != nil {
t.Fatal(err)
}
})
}
@@ -0,0 +1,3 @@
CREATE TABLE test_1 (
Date Date
) Engine=Memory;
@@ -0,0 +1,3 @@
CREATE TABLE test_2 (
Date Date
) Engine=Memory;
@@ -0,0 +1,10 @@
DROP TABLE IF EXISTS driver_ratings;
DROP TABLE IF EXISTS user_ratings;
DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS driver_ratings_queue;
DROP TABLE IF EXISTS user_ratings_queue;
DROP TABLE IF EXISTS orders_queue;
DROP VIEW IF EXISTS user_ratings_queue_mv;
DROP VIEW IF EXISTS driver_ratings_queue_mv;
DROP VIEW IF EXISTS orders_queue_mv;
DROP DATABASE IF EXISTS analytics;
@@ -0,0 +1,81 @@
CREATE DATABASE IF NOT EXISTS analytics;
CREATE TABLE IF NOT EXISTS analytics.driver_ratings(
rate UInt8,
userID Int64,
driverID String,
orderID String,
inserted_time DateTime DEFAULT now()
) ENGINE = MergeTree
PARTITION BY driverID
ORDER BY (inserted_time);
CREATE TABLE analytics.driver_ratings_queue(
rate UInt8,
userID Int64,
driverID String,
orderID String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'broker:9092',
kafka_topic_list = 'driver-ratings',
kafka_group_name = 'rating_readers',
kafka_format = 'Avro',
kafka_max_block_size = 1048576;
CREATE MATERIALIZED VIEW analytics.driver_ratings_queue_mv TO analytics.driver_ratings AS
SELECT rate, userID, driverID, orderID
FROM analytics.driver_ratings_queue;
CREATE TABLE IF NOT EXISTS analytics.user_ratings(
rate UInt8,
userID Int64,
driverID String,
orderID String,
inserted_time DateTime DEFAULT now()
) ENGINE = MergeTree
PARTITION BY userID
ORDER BY (inserted_time);
CREATE TABLE analytics.user_ratings_queue(
rate UInt8,
userID Int64,
driverID String,
orderID String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'broker:9092',
kafka_topic_list = 'user-ratings',
kafka_group_name = 'rating_readers',
kafka_format = 'JSON',
kafka_max_block_size = 1048576;
CREATE MATERIALIZED VIEW analytics.user_ratings_queue_mv TO analytics.user_ratings AS
SELECT rate, userID, driverID, orderID
FROM analytics.user_ratings_queue;
CREATE TABLE IF NOT EXISTS analytics.orders(
from_place String,
to_place String,
userID Int64,
driverID String,
orderID String,
inserted_time DateTime DEFAULT now()
) ENGINE = MergeTree
PARTITION BY driverID
ORDER BY (inserted_time);
CREATE TABLE analytics.orders_queue(
from_place String,
to_place String,
userID Int64,
driverID String,
orderID String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'broker:9092',
kafka_topic_list = 'orders',
kafka_group_name = 'order_readers',
kafka_format = 'Avro',
kafka_max_block_size = 1048576;
CREATE MATERIALIZED VIEW analytics.orders_queue_mv TO orders AS
SELECT from_place, to_place, userID, driverID, orderID
FROM analytics.orders_queue;